repos / pico

pico services mono repo
git clone https://github.com/picosh/pico.git

pico / pkg / pobj / storage
Eric Bower  ·  2026-01-25

fs.go

  1package storage
  2
  3import (
  4	"crypto/md5"
  5	"encoding/hex"
  6	"fmt"
  7	"io"
  8	"io/fs"
  9	"log/slog"
 10	"net/http"
 11	"os"
 12	"path"
 13	"path/filepath"
 14	"strings"
 15	"time"
 16
 17	"github.com/google/renameio/v2"
 18	"github.com/picosh/pico/pkg/send/utils"
 19	"github.com/picosh/pico/pkg/shared/mime"
 20)
 21
 22var KB = 1000
 23var MB = KB * 1000
 24
 25// https://stackoverflow.com/a/32482941
 26func dirSize(path string) (int64, error) {
 27	var size int64
 28	err := filepath.Walk(path, func(_ string, info os.FileInfo, err error) error {
 29		if err != nil {
 30			return err
 31		}
 32		if !info.IsDir() {
 33			size += info.Size()
 34		}
 35		return err
 36	})
 37
 38	return size, err
 39}
 40
 41type StorageFS struct {
 42	Dir    string
 43	Logger *slog.Logger
 44}
 45
 46var _ ObjectStorage = &StorageFS{}
 47var _ ObjectStorage = (*StorageFS)(nil)
 48
 49func NewStorageFS(logger *slog.Logger, dir string) (*StorageFS, error) {
 50	return &StorageFS{Logger: logger, Dir: dir}, nil
 51}
 52
 53func (s *StorageFS) GetBucket(name string) (Bucket, error) {
 54	dirPath := filepath.Join(s.Dir, name)
 55	bucket := Bucket{
 56		Name: name,
 57		Path: dirPath,
 58	}
 59	// s.Logger.Info("get bucket", "dir", dirPath)
 60
 61	info, err := os.Stat(dirPath)
 62	if os.IsNotExist(err) {
 63		return bucket, fmt.Errorf("directory does not exist: %v %w", dirPath, err)
 64	}
 65
 66	if err != nil {
 67		return bucket, fmt.Errorf("directory error: %v %w", dirPath, err)
 68
 69	}
 70
 71	if !info.IsDir() {
 72		return bucket, fmt.Errorf("directory is a file, not a directory: %#v", dirPath)
 73	}
 74
 75	return bucket, nil
 76}
 77
 78func (s *StorageFS) UpsertBucket(name string) (Bucket, error) {
 79	s.Logger.Info("upsert bucket", "name", name)
 80	bucket, err := s.GetBucket(name)
 81	if err == nil {
 82		return bucket, nil
 83	}
 84
 85	dir := filepath.Join(s.Dir, name)
 86	s.Logger.Info("bucket not found, creating", "dir", dir, "err", err)
 87	err = os.MkdirAll(dir, os.ModePerm)
 88	if err != nil {
 89		return bucket, err
 90	}
 91
 92	return bucket, nil
 93}
 94
 95func (s *StorageFS) GetBucketQuota(bucket Bucket) (uint64, error) {
 96	dsize, err := dirSize(bucket.Path)
 97	return uint64(dsize), err
 98}
 99
100// DeleteBucket will delete all contents regardless if files exist inside of it.
101func (s *StorageFS) DeleteBucket(bucket Bucket) error {
102	return os.RemoveAll(bucket.Path)
103}
104
105func (s *StorageFS) GetObject(bucket Bucket, fpath string) (utils.ReadAndReaderAtCloser, *ObjectInfo, error) {
106	objInfo := &ObjectInfo{
107		Size:         0,
108		LastModified: time.Time{},
109		Metadata:     make(http.Header),
110		ETag:         "",
111	}
112
113	dat, err := os.Open(filepath.Join(bucket.Path, fpath))
114	if err != nil {
115		return nil, objInfo, err
116	}
117
118	info, err := dat.Stat()
119	if err != nil {
120		_ = dat.Close()
121		return nil, objInfo, err
122	}
123
124	etag := ""
125	// only generate etag if file is less than 10MB
126	if info.Size() <= int64(10*MB) {
127		// calculate etag
128		h := md5.New()
129		if _, err := io.Copy(h, dat); err != nil {
130			_ = dat.Close()
131			return nil, objInfo, err
132		}
133		md5Sum := h.Sum(nil)
134		etag = hex.EncodeToString(md5Sum)
135
136		// reset os.File reader
137		_, err = dat.Seek(0, io.SeekStart)
138		if err != nil {
139			_ = dat.Close()
140			return nil, objInfo, err
141		}
142	}
143
144	objInfo.ETag = etag
145	objInfo.Size = info.Size()
146	objInfo.LastModified = info.ModTime()
147	objInfo.Metadata.Set("content-type", mime.GetMimeType(fpath))
148	return dat, objInfo, nil
149}
150
151func (s *StorageFS) PutObject(bucket Bucket, fpath string, contents io.Reader, entry *utils.FileEntry) (string, int64, error) {
152	loc := filepath.Join(bucket.Path, fpath)
153	err := os.MkdirAll(filepath.Dir(loc), os.ModePerm)
154	if err != nil {
155		return "", 0, err
156	}
157	out, err := renameio.NewPendingFile(loc, renameio.WithPermissions(os.ModePerm))
158	if err != nil {
159		return "", 0, err
160	}
161
162	size, err := io.Copy(out, contents)
163	if err != nil {
164		return "", 0, err
165	}
166
167	if err := out.CloseAtomicallyReplace(); err != nil {
168		return "", 0, err
169	}
170
171	if entry.Mtime > 0 {
172		uTime := time.Unix(entry.Mtime, 0)
173		_ = os.Chtimes(loc, uTime, uTime)
174	}
175
176	return loc, size, nil
177}
178
179func (s *StorageFS) DeleteObject(bucket Bucket, fpath string) error {
180	loc := filepath.Join(bucket.Path, fpath)
181	err := os.Remove(loc)
182	if err != nil {
183		if os.IsNotExist(err) {
184			return nil
185		}
186		return err
187	}
188
189	// traverse up the folder tree and remove all empty folders
190	dir := filepath.Dir(loc)
191	for dir != "" {
192		f, err := os.Open(dir)
193		if err != nil {
194			s.Logger.Info("open dir", "dir", dir, "err", err)
195			break
196		}
197		defer func() {
198			_ = f.Close()
199		}()
200
201		// https://stackoverflow.com/a/30708914
202		contents, err := f.Readdirnames(-1)
203		if err != nil {
204			s.Logger.Info("read dir", "dir", dir, "err", err)
205			break
206		}
207		if len(contents) > 0 {
208			break
209		}
210
211		err = os.Remove(dir)
212		if err != nil {
213			s.Logger.Info("remove dir", "dir", dir, "err", err)
214			break
215		}
216		fp := strings.Split(dir, "/")
217		prefix := ""
218		if strings.HasPrefix(loc, "/") {
219			prefix = "/"
220		}
221		dir = prefix + filepath.Join(fp[:len(fp)-1]...)
222	}
223
224	return nil
225}
226
227func (s *StorageFS) ListBuckets() ([]string, error) {
228	entries, err := os.ReadDir(s.Dir)
229	if err != nil {
230		return []string{}, err
231	}
232
233	buckets := []string{}
234	for _, e := range entries {
235		if !e.IsDir() {
236			continue
237		}
238		buckets = append(buckets, e.Name())
239	}
240	return buckets, nil
241}
242
243func (s *StorageFS) ListObjects(bucket Bucket, dir string, recursive bool) ([]os.FileInfo, error) {
244	fileList := []os.FileInfo{}
245
246	fpath := path.Join(bucket.Path, dir)
247
248	info, err := os.Stat(fpath)
249	if err != nil {
250		if os.IsNotExist(err) {
251			return fileList, nil
252		}
253		return fileList, err
254	}
255
256	if info.IsDir() && !strings.HasSuffix(dir, "/") {
257		fileList = append(fileList, &utils.VirtualFile{
258			FName:    "",
259			FIsDir:   info.IsDir(),
260			FSize:    info.Size(),
261			FModTime: info.ModTime(),
262		})
263
264		return fileList, err
265	}
266
267	var files []utils.VirtualFile
268
269	if recursive {
270		err = filepath.WalkDir(fpath, func(s string, d fs.DirEntry, err error) error {
271			if err != nil {
272				return err
273			}
274			info, err := d.Info()
275			if err != nil {
276				return nil
277			}
278			fname := strings.TrimPrefix(s, fpath)
279			if fname == "" {
280				return nil
281			}
282			// rsync does not expect prefixed `/` so without this `rsync --delete` is borked
283			fname = strings.TrimPrefix(fname, "/")
284			files = append(files, utils.VirtualFile{
285				FName:    fname,
286				FIsDir:   info.IsDir(),
287				FSize:    info.Size(),
288				FModTime: info.ModTime(),
289			})
290			return nil
291		})
292		if err != nil {
293			fileList = append(fileList, info)
294			return fileList, nil
295		}
296	} else {
297		fls, err := os.ReadDir(fpath)
298		if err != nil {
299			fileList = append(fileList, info)
300			return fileList, nil
301		}
302		for _, d := range fls {
303			info, err := d.Info()
304			if err != nil {
305				continue
306			}
307			fp := info.Name()
308			files = append(files, utils.VirtualFile{
309				FName:    fp,
310				FIsDir:   info.IsDir(),
311				FSize:    info.Size(),
312				FModTime: info.ModTime(),
313			})
314		}
315	}
316
317	for _, f := range files {
318		fileList = append(fileList, &f)
319	}
320
321	return fileList, err
322}