repos / pico

pico services mono repo
git clone https://github.com/picosh/pico.git

commit
b73b460
parent
0b62aa3
author
Eric Bower
date
2025-07-04 12:52:30 -0400 EDT
feat(pgs): standalone pgs binary

This introduces a new cmd binary for pgs: standalone.

This allows end-users to self-host their own version of `pgs` with as
few dependencies as possible.  With this change users can self-host pgs
with (2) deps: caddy and sqlite
26 files changed,  +404, -84
M go.mod
M go.sum
M cmd/pgs/cdn/main.go
+9, -2
 1@@ -15,13 +15,20 @@ import (
 2 	"github.com/picosh/pico/pkg/apps/pgs"
 3 	"github.com/picosh/pico/pkg/cache"
 4 	"github.com/picosh/pico/pkg/shared"
 5+	"github.com/picosh/utils"
 6 	"github.com/prometheus/client_golang/prometheus/promhttp"
 7 )
 8 
 9 func main() {
10-	logger := shared.CreateLogger("pgs-cdn")
11+	withPipe := strings.ToLower(utils.GetEnv("PICO_PIPE_ENABLED", "true")) == "true"
12+	logger := shared.CreateLogger("pgs-cdn", withPipe)
13 	ctx := context.Background()
14-	cfg := pgs.NewPgsConfig(logger, nil, nil)
15+	drain := pgs.CreateSubCacheDrain(ctx, logger)
16+	pubsub := pgs.NewPubsubPipe(drain)
17+	defer func() {
18+		_ = pubsub.Close()
19+	}()
20+	cfg := pgs.NewPgsConfig(logger, nil, nil, drain)
21 	httpCache := pgs.SetupCache(cfg)
22 	router := &pgs.WebRouter{
23 		Cfg:            cfg,
M cmd/pgs/ssh/main.go
+12, -2
 1@@ -1,6 +1,9 @@
 2 package main
 3 
 4 import (
 5+	"context"
 6+	"strings"
 7+
 8 	"github.com/picosh/pico/pkg/apps/pgs"
 9 	pgsdb "github.com/picosh/pico/pkg/apps/pgs/db"
10 	"github.com/picosh/pico/pkg/shared"
11@@ -10,7 +13,8 @@ import (
12 
13 func main() {
14 	dbURL := utils.GetEnv("DATABASE_URL", "")
15-	logger := shared.CreateLogger("pgs-ssh")
16+	withPipe := strings.ToLower(utils.GetEnv("PICO_PIPE_ENABLED", "true")) == "true"
17+	logger := shared.CreateLogger("pgs-ssh", withPipe)
18 	dbpool, err := pgsdb.NewDB(dbURL, logger)
19 	if err != nil {
20 		panic(err)
21@@ -20,7 +24,13 @@ func main() {
22 	if err != nil {
23 		panic(err)
24 	}
25-	cfg := pgs.NewPgsConfig(logger, dbpool, st)
26+	ctx := context.Background()
27+	drain := pgs.CreatePubCacheDrain(ctx, logger)
28+	pubsub := pgs.NewPubsubPipe(drain)
29+	defer func() {
30+		_ = pubsub.Close()
31+	}()
32+	cfg := pgs.NewPgsConfig(logger, dbpool, st, pubsub)
33 	killCh := make(chan error)
34 	pgs.StartSshServer(cfg, killCh)
35 }
A cmd/pgs/standalone/main.go
+32, -0
 1@@ -0,0 +1,32 @@
 2+package main
 3+
 4+import (
 5+	"github.com/picosh/pico/pkg/apps/pgs"
 6+	pgsdb "github.com/picosh/pico/pkg/apps/pgs/db"
 7+	"github.com/picosh/pico/pkg/shared"
 8+	"github.com/picosh/pico/pkg/shared/storage"
 9+	"github.com/picosh/utils"
10+)
11+
12+func main() {
13+	dbURL := utils.GetEnv("DATABASE_URL", "./data/pgs.sqlite3")
14+	logger := shared.CreateLogger("pgs-standalone", false)
15+	dbpool, err := pgsdb.NewSqliteDB(dbURL, logger)
16+	if err != nil {
17+		panic(err)
18+	}
19+	adapter := storage.GetStorageTypeFromEnv()
20+	st, err := storage.NewStorage(logger, adapter)
21+	if err != nil {
22+		panic(err)
23+	}
24+	pubsub := pgs.NewPubsubChan()
25+	defer func() {
26+		_ = pubsub.Close()
27+	}()
28+	cfg := pgs.NewPgsConfig(logger, dbpool, st, pubsub)
29+	killCh := make(chan error)
30+
31+	go pgs.StartApiServer(cfg)
32+	pgs.StartSshServer(cfg, killCh)
33+}
M cmd/pgs/web/main.go
+12, -2
 1@@ -1,6 +1,9 @@
 2 package main
 3 
 4 import (
 5+	"context"
 6+	"strings"
 7+
 8 	"github.com/picosh/pico/pkg/apps/pgs"
 9 	pgsdb "github.com/picosh/pico/pkg/apps/pgs/db"
10 	"github.com/picosh/pico/pkg/shared"
11@@ -10,7 +13,8 @@ import (
12 
13 func main() {
14 	dbURL := utils.GetEnv("DATABASE_URL", "")
15-	logger := shared.CreateLogger("pgs-web")
16+	withPipe := strings.ToLower(utils.GetEnv("PICO_PIPE_ENABLED", "true")) == "true"
17+	logger := shared.CreateLogger("pgs-web", withPipe)
18 	dbpool, err := pgsdb.NewDB(dbURL, logger)
19 	if err != nil {
20 		panic(err)
21@@ -20,6 +24,12 @@ func main() {
22 	if err != nil {
23 		panic(err)
24 	}
25-	cfg := pgs.NewPgsConfig(logger, dbpool, st)
26+	ctx := context.Background()
27+	drain := pgs.CreateSubCacheDrain(ctx, logger)
28+	pubsub := pgs.NewPubsubPipe(drain)
29+	defer func() {
30+		_ = pubsub.Close()
31+	}()
32+	cfg := pgs.NewPgsConfig(logger, dbpool, st, pubsub)
33 	pgs.StartApiServer(cfg)
34 }
M go.mod
+6, -0
 1@@ -63,6 +63,7 @@ require (
 2 	golang.org/x/crypto v0.38.0
 3 	google.golang.org/protobuf v1.36.6
 4 	gopkg.in/yaml.v2 v2.4.0
 5+	modernc.org/sqlite v1.36.2
 6 )
 7 
 8 require (
 9@@ -206,6 +207,7 @@ require (
10 	github.com/nats-io/nats.go v1.40.1 // indirect
11 	github.com/nats-io/nkeys v0.4.10 // indirect
12 	github.com/nats-io/nuid v1.0.1 // indirect
13+	github.com/ncruces/go-strftime v0.1.9 // indirect
14 	github.com/neurosnap/go-jpeg-image-structure v0.0.0-20221010133817-70b1c1ff679e // indirect
15 	github.com/nutsdb/nutsdb v1.0.4 // indirect
16 	github.com/onsi/ginkgo/v2 v2.23.3 // indirect
17@@ -222,6 +224,7 @@ require (
18 	github.com/quic-go/qpack v0.5.1 // indirect
19 	github.com/quic-go/quic-go v0.50.1 // indirect
20 	github.com/redis/rueidis v1.0.56 // indirect
21+	github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
22 	github.com/rivo/uniseg v0.4.7 // indirect
23 	github.com/rogpeppe/go-internal v1.13.2-0.20241226121412-a5dc8ff20d0a // indirect
24 	github.com/rs/xid v1.6.0 // indirect
25@@ -293,5 +296,8 @@ require (
26 	google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.5.1 // indirect
27 	gopkg.in/yaml.v3 v3.0.1 // indirect
28 	howett.net/plist v1.0.1 // indirect
29+	modernc.org/libc v1.61.13 // indirect
30+	modernc.org/mathutil v1.7.1 // indirect
31+	modernc.org/memory v1.8.2 // indirect
32 	mvdan.cc/xurls/v2 v2.6.0 // indirect
33 )
M go.sum
+28, -0
 1@@ -605,6 +605,8 @@ github.com/nats-io/nkeys v0.4.10 h1:glmRrpCmYLHByYcePvnTBEAwawwapjCPMjy2huw20wc=
 2 github.com/nats-io/nkeys v0.4.10/go.mod h1:OjRrnIKnWBFl+s4YK5ChQfvHP2fxqZexrKJoVVyWB3U=
 3 github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
 4 github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
 5+github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdhx/f4=
 6+github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls=
 7 github.com/neelance/astrewrite v0.0.0-20160511093645-99348263ae86/go.mod h1:kHJEU3ofeGjhHklVoIGuVj85JJwZ6kWPaJwCIxgnFmo=
 8 github.com/neelance/sourcemap v0.0.0-20151028013722-8c68805598ab/go.mod h1:Qr6/a/Q4r9LP1IltGz7tA7iOK1WonHEYhu1HRBA7ZiM=
 9 github.com/neurosnap/go-exif-remove v0.0.0-20221010134343-50d1e3c35577 h1:hVmVNttSLNloGsbFKVXAUHonXTd8KKrv30U/8UkloKI=
10@@ -703,6 +705,8 @@ github.com/quic-go/quic-go v0.50.1 h1:unsgjFIUqW8a2oopkY7YNONpV1gYND6Nt9hnt1PN94
11 github.com/quic-go/quic-go v0.50.1/go.mod h1:Vim6OmUvlYdwBhXP9ZVrtGmCMWa3wEqhq3NgYrI8b4E=
12 github.com/redis/rueidis v1.0.56 h1:DwPjFIgas1OMU/uCqBELOonu9TKMYt3MFPq6GtwEWNY=
13 github.com/redis/rueidis v1.0.56/go.mod h1:g660/008FMYmAF46HG4lmcpcgFNj+jCjCAZUUM+wEbs=
14+github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
15+github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
16 github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
17 github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
18 github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ=
19@@ -1238,6 +1242,30 @@ honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWh
20 honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
21 howett.net/plist v1.0.1 h1:37GdZ8tP09Q35o9ych3ehygcsL+HqKSwzctveSlarvM=
22 howett.net/plist v1.0.1/go.mod h1:lqaXoTrLY4hg8tnEzNru53gicrbv7rrk+2xJA/7hw9g=
23+modernc.org/cc/v4 v4.24.4 h1:TFkx1s6dCkQpd6dKurBNmpo+G8Zl4Sq/ztJ+2+DEsh0=
24+modernc.org/cc/v4 v4.24.4/go.mod h1:uVtb5OGqUKpoLWhqwNQo/8LwvoiEBLvZXIQ/SmO6mL0=
25+modernc.org/ccgo/v4 v4.23.16 h1:Z2N+kk38b7SfySC1ZkpGLN2vthNJP1+ZzGZIlH7uBxo=
26+modernc.org/ccgo/v4 v4.23.16/go.mod h1:nNma8goMTY7aQZQNTyN9AIoJfxav4nvTnvKThAeMDdo=
27+modernc.org/fileutil v1.3.0 h1:gQ5SIzK3H9kdfai/5x41oQiKValumqNTDXMvKo62HvE=
28+modernc.org/fileutil v1.3.0/go.mod h1:XatxS8fZi3pS8/hKG2GH/ArUogfxjpEKs3Ku3aK4JyQ=
29+modernc.org/gc/v2 v2.6.3 h1:aJVhcqAte49LF+mGveZ5KPlsp4tdGdAOT4sipJXADjw=
30+modernc.org/gc/v2 v2.6.3/go.mod h1:YgIahr1ypgfe7chRuJi2gD7DBQiKSLMPgBQe9oIiito=
31+modernc.org/libc v1.61.13 h1:3LRd6ZO1ezsFiX1y+bHd1ipyEHIJKvuprv0sLTBwLW8=
32+modernc.org/libc v1.61.13/go.mod h1:8F/uJWL/3nNil0Lgt1Dpz+GgkApWh04N3el3hxJcA6E=
33+modernc.org/mathutil v1.7.1 h1:GCZVGXdaN8gTqB1Mf/usp1Y/hSqgI2vAGGP4jZMCxOU=
34+modernc.org/mathutil v1.7.1/go.mod h1:4p5IwJITfppl0G4sUEDtCr4DthTaT47/N3aT6MhfgJg=
35+modernc.org/memory v1.8.2 h1:cL9L4bcoAObu4NkxOlKWBWtNHIsnnACGF/TbqQ6sbcI=
36+modernc.org/memory v1.8.2/go.mod h1:ZbjSvMO5NQ1A2i3bWeDiVMxIorXwdClKE/0SZ+BMotU=
37+modernc.org/opt v0.1.4 h1:2kNGMRiUjrp4LcaPuLY2PzUfqM/w9N23quVwhKt5Qm8=
38+modernc.org/opt v0.1.4/go.mod h1:03fq9lsNfvkYSfxrfUhZCWPk1lm4cq4N+Bh//bEtgns=
39+modernc.org/sortutil v1.2.1 h1:+xyoGf15mM3NMlPDnFqrteY07klSFxLElE2PVuWIJ7w=
40+modernc.org/sortutil v1.2.1/go.mod h1:7ZI3a3REbai7gzCLcotuw9AC4VZVpYMjDzETGsSMqJE=
41+modernc.org/sqlite v1.36.2 h1:vjcSazuoFve9Wm0IVNHgmJECoOXLZM1KfMXbcX2axHA=
42+modernc.org/sqlite v1.36.2/go.mod h1:ADySlx7K4FdY5MaJcEv86hTJ0PjedAloTUuif0YS3ws=
43+modernc.org/strutil v1.2.1 h1:UneZBkQA+DX2Rp35KcM69cSsNES9ly8mQWD71HKlOA0=
44+modernc.org/strutil v1.2.1/go.mod h1:EHkiggD70koQxjVdSBM3JKM7k6L0FbGE5eymy9i3B9A=
45+modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y=
46+modernc.org/token v1.1.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM=
47 mvdan.cc/xurls/v2 v2.6.0 h1:3NTZpeTxYVWNSokW3MKeyVkz/j7uYXYiMtXRUfmjbgI=
48 mvdan.cc/xurls/v2 v2.6.0/go.mod h1:bCvEZ1XvdA6wDnxY7jPPjEmigDtvtvPXAD/Exa9IMSk=
49 pgregory.net/rapid v1.1.0 h1:CMa0sjHSru3puNx+J0MIAuiiEV4N0qj8/cMWGBBCsjw=
M pkg/apps/auth/api.go
+2, -1
 1@@ -806,6 +806,7 @@ func authMux(apiConfig *shared.ApiConfig) *http.ServeMux {
 2 
 3 func StartApiServer() {
 4 	debug := utils.GetEnv("AUTH_DEBUG", "0")
 5+	withPipe := strings.ToLower(utils.GetEnv("PICO_PIPE_ENABLED", "true")) == "true"
 6 
 7 	cfg := &shared.ConfigSite{
 8 		DbURL:         utils.GetEnv("DATABASE_URL", ""),
 9@@ -825,7 +826,7 @@ func StartApiServer() {
10 		panic("must provide PICO_SECRET environment variable")
11 	}
12 
13-	logger := shared.CreateLogger("auth-web")
14+	logger := shared.CreateLogger("auth-web", withPipe)
15 
16 	cfg.Logger = logger
17 
M pkg/apps/auth/api_test.go
+1, -1
1@@ -276,7 +276,7 @@ func mkpath(path string) string {
2 }
3 
4 func setupTest() *shared.ApiConfig {
5-	logger := shared.CreateLogger("auth-test")
6+	logger := shared.CreateLogger("auth-test", false)
7 	cfg := &shared.ConfigSite{
8 		Issuer:        "auth.pico.test",
9 		Domain:        "http://0.0.0.0:3000",
M pkg/apps/feeds/config.go
+4, -1
 1@@ -1,6 +1,8 @@
 2 package feeds
 3 
 4 import (
 5+	"strings"
 6+
 7 	"github.com/picosh/pico/pkg/shared"
 8 	"github.com/picosh/utils"
 9 )
10@@ -12,6 +14,7 @@ func NewConfigSite(service string) *shared.ConfigSite {
11 	protocol := utils.GetEnv("FEEDS_PROTOCOL", "https")
12 	dbURL := utils.GetEnv("DATABASE_URL", "")
13 	sendgridKey := utils.GetEnv("SENDGRID_API_KEY", "")
14+	withPipe := strings.ToLower(utils.GetEnv("PICO_PIPE_ENABLED", "true")) == "true"
15 
16 	return &shared.ConfigSite{
17 		Debug:       debug == "1",
18@@ -23,6 +26,6 @@ func NewConfigSite(service string) *shared.ConfigSite {
19 		Space:       "feeds",
20 		AllowedExt:  []string{".txt"},
21 		HiddenPosts: []string{"_header.txt", "_readme.txt"},
22-		Logger:      shared.CreateLogger(service),
23+		Logger:      shared.CreateLogger(service, withPipe),
24 	}
25 }
M pkg/apps/pastes/config.go
+4, -1
 1@@ -1,6 +1,8 @@
 2 package pastes
 3 
 4 import (
 5+	"strings"
 6+
 7 	"github.com/picosh/pico/pkg/shared"
 8 	"github.com/picosh/utils"
 9 )
10@@ -11,6 +13,7 @@ func NewConfigSite(service string) *shared.ConfigSite {
11 	port := utils.GetEnv("PASTES_WEB_PORT", "3000")
12 	dbURL := utils.GetEnv("DATABASE_URL", "")
13 	protocol := utils.GetEnv("PASTES_PROTOCOL", "https")
14+	withPipe := strings.ToLower(utils.GetEnv("PICO_PIPE_ENABLED", "true")) == "true"
15 
16 	return &shared.ConfigSite{
17 		Debug:        debug == "1",
18@@ -19,7 +22,7 @@ func NewConfigSite(service string) *shared.ConfigSite {
19 		Protocol:     protocol,
20 		DbURL:        dbURL,
21 		Space:        "pastes",
22-		Logger:       shared.CreateLogger(service),
23+		Logger:       shared.CreateLogger(service, withPipe),
24 		MaxAssetSize: int64(3 * utils.MB),
25 	}
26 }
M pkg/apps/pgs/cli.go
+2, -27
 1@@ -1,7 +1,6 @@
 2 package pgs
 3 
 4 import (
 5-	"context"
 6 	"errors"
 7 	"fmt"
 8 	"io"
 9@@ -14,7 +13,6 @@ import (
10 	pgsdb "github.com/picosh/pico/pkg/apps/pgs/db"
11 	"github.com/picosh/pico/pkg/db"
12 	sst "github.com/picosh/pico/pkg/pobj/storage"
13-	"github.com/picosh/pico/pkg/pssh"
14 	"github.com/picosh/pico/pkg/shared"
15 	"github.com/picosh/utils"
16 )
17@@ -513,20 +511,8 @@ func (c *Cmd) cache(projectName string) error {
18 	c.output(fmt.Sprintf("clearing http cache for %s", projectName))
19 
20 	if c.Write {
21-		var ctx context.Context
22-		if s, ok := c.Session.(*pssh.SSHServerConnSession); ok {
23-			ctx = s.Context()
24-		} else {
25-			ctx = context.Background()
26-		}
27-
28-		ctx, cancel := context.WithCancel(ctx)
29-		defer cancel()
30-
31-		send := createPubCacheDrain(ctx, c.Log)
32-
33 		surrogate := getSurrogateKey(c.User.Name, projectName)
34-		return purgeCache(c.Cfg, send, surrogate)
35+		return purgeCache(c.Cfg, c.Cfg.Pubsub, surrogate)
36 	}
37 	return nil
38 }
39@@ -550,18 +536,7 @@ func (c *Cmd) cacheAll() error {
40 	)
41 	c.output("clearing http cache for all sites")
42 	if c.Write {
43-		var ctx context.Context
44-		if s, ok := c.Session.(*pssh.SSHServerConnSession); ok {
45-			ctx = s.Context()
46-		} else {
47-			ctx = context.Background()
48-		}
49-
50-		ctx, cancel := context.WithCancel(ctx)
51-		defer cancel()
52-
53-		send := createPubCacheDrain(ctx, c.Log)
54-		return purgeAllCache(c.Cfg, send)
55+		return purgeAllCache(c.Cfg, c.Cfg.Pubsub)
56 	}
57 	return nil
58 }
M pkg/apps/pgs/config.go
+4, -2
 1@@ -20,9 +20,9 @@ type PgsConfig struct {
 2 	MaxSpecialFileSize int64
 3 	SshHost            string
 4 	SshPort            string
 5-	TxtPrefix          string
 6 	WebPort            string
 7 	WebProtocol        string
 8+	TxtPrefix          string
 9 
10 	// This channel will receive the surrogate key for a project (e.g. static site)
11 	// which will inform the caching layer to clear the cache for that site.
12@@ -33,6 +33,7 @@ type PgsConfig struct {
13 	Logger *slog.Logger
14 	// Where we store the static assets uploaded to our service.
15 	Storage storage.StorageServe
16+	Pubsub  PicoPubsub
17 }
18 
19 func (c *PgsConfig) AssetURL(username, projectName, fpath string) string {
20@@ -66,7 +67,7 @@ var maxAssetSize = int64(10 * utils.MB)
21 // Needs to be small for caching files like _headers and _redirects.
22 var maxSpecialFileSize = int64(5 * utils.KB)
23 
24-func NewPgsConfig(logger *slog.Logger, dbpool pgsdb.PgsDB, st storage.StorageServe) *PgsConfig {
25+func NewPgsConfig(logger *slog.Logger, dbpool pgsdb.PgsDB, st storage.StorageServe, pubsub PicoPubsub) *PgsConfig {
26 	domain := utils.GetEnv("PGS_DOMAIN", "pgs.sh")
27 	port := utils.GetEnv("PGS_WEB_PORT", "3000")
28 	protocol := utils.GetEnv("PGS_PROTOCOL", "https")
29@@ -98,6 +99,7 @@ func NewPgsConfig(logger *slog.Logger, dbpool pgsdb.PgsDB, st storage.StorageSer
30 		DB:                 dbpool,
31 		Logger:             logger,
32 		Storage:            st,
33+		Pubsub:             pubsub,
34 	}
35 
36 	return &cfg
M pkg/apps/pgs/db/postgres.go
+12, -2
 1@@ -109,7 +109,12 @@ func (me *PgsPsqlDB) UpsertProject(userID, projectName, projectDir string) (*db.
 2 		// this just updates the `createdAt` timestamp, useful for book-keeping
 3 		err = me.UpdateProject(userID, projectName)
 4 		if err != nil {
 5-			me.Logger.Error("could not update project", "err", err)
 6+			me.Logger.Error(
 7+				"could not update project",
 8+				"err", err,
 9+				"projectName", projectName,
10+				"projectDir", projectDir,
11+			)
12 			return nil, err
13 		}
14 		return project, nil
15@@ -117,7 +122,12 @@ func (me *PgsPsqlDB) UpsertProject(userID, projectName, projectDir string) (*db.
16 
17 	_, err = me.InsertProject(userID, projectName, projectName)
18 	if err != nil {
19-		me.Logger.Error("could not create project", "err", err)
20+		me.Logger.Error(
21+			"could not create project",
22+			"err", err,
23+			"projectName", projectName,
24+			"projectDir", projectDir,
25+		)
26 		return nil, err
27 	}
28 	return me.FindProjectByName(userID, projectName)
A pkg/apps/pgs/db/sqlite.go
+139, -0
  1@@ -0,0 +1,139 @@
  2+package pgsdb
  3+
  4+import (
  5+	"fmt"
  6+	"log/slog"
  7+
  8+	"github.com/jmoiron/sqlx"
  9+	_ "modernc.org/sqlite"
 10+)
 11+
 12+var sqliteSchema = `
 13+CREATE TABLE IF NOT EXISTS app_users (
 14+	id INTEGER PRIMARY KEY AUTOINCREMENT,
 15+	name TEXT NOT NULL,
 16+	created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
 17+	UNIQUE(name)
 18+);
 19+
 20+CREATE TABLE IF NOT EXISTS public_keys (
 21+	id INTEGER PRIMARY KEY AUTOINCREMENT,
 22+	user_id INTEGER NOT NULL,
 23+	public_key TEXT NOT NULL UNIQUE,
 24+	name TEXT NOT NULL,
 25+	created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
 26+	UNIQUE (user_id, public_key),
 27+	CONSTRAINT public_keys_user_id_fk
 28+		FOREIGN KEY(user_id) REFERENCES app_users(id)
 29+		ON DELETE CASCADE
 30+		ON UPDATE CASCADE
 31+);
 32+
 33+CREATE TABLE IF NOT EXISTS projects (
 34+	id INTEGER PRIMARY KEY AUTOINCREMENT,
 35+	user_id INTEGER NOT NULL,
 36+	name TEXT NOT NULL,
 37+	project_dir TEXT NOT NULL DEFAULT '',
 38+	created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
 39+	updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
 40+	acl BLOB DEFAULT '{"data": [], "type": "public"}' NOT NULL,
 41+	blocked TEXT NOT NULL DEFAULT '',
 42+	UNIQUE (user_id, name),
 43+	CONSTRAINT projects_user_id_fk
 44+		FOREIGN KEY(user_id) REFERENCES app_users(id)
 45+		ON DELETE CASCADE
 46+		ON UPDATE CASCADE
 47+);
 48+
 49+CREATE TABLE IF NOT EXISTS feature_flags (
 50+	id INTEGER PRIMARY KEY AUTOINCREMENT,
 51+	user_id INTEGER NOT NULL,
 52+	name TEXT NOT NULL,
 53+	created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
 54+	expires_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
 55+	data BLOB DEFAULT '{}' NOT NULL,
 56+	payment_history_id INTEGER,
 57+	CONSTRAINT feature_flags_user_id_fk
 58+		FOREIGN KEY(user_id) REFERENCES app_users(id)
 59+		ON DELETE CASCADE
 60+		ON UPDATE CASCADE
 61+);
 62+`
 63+
 64+var sqliteMigrations = []string{
 65+	"", // migration #0 is reserved for schema initialization
 66+}
 67+
 68+func NewSqliteDB(databaseUrl string, logger *slog.Logger) (*PgsPsqlDB, error) {
 69+	var err error
 70+	d := &PgsPsqlDB{
 71+		Logger: logger,
 72+	}
 73+	d.Logger.Info("connecting to sqlite", "databaseUrl", databaseUrl)
 74+
 75+	db, err := SqliteOpen(databaseUrl, logger)
 76+	if err != nil {
 77+		return nil, err
 78+	}
 79+
 80+	d.Db = db
 81+	return d, nil
 82+}
 83+
 84+// Open opens a database connection.
 85+func SqliteOpen(dsn string, logger *slog.Logger) (*sqlx.DB, error) {
 86+	logger.Info("opening db file", "dsn", dsn)
 87+	db, err := sqlx.Connect("sqlite", dsn)
 88+	if err != nil {
 89+		return nil, err
 90+	}
 91+
 92+	err = sqliteUpgrade(db)
 93+	if err != nil {
 94+		_ = db.Close()
 95+		return nil, err
 96+	}
 97+
 98+	return db, nil
 99+}
100+
101+func sqliteUpgrade(db *sqlx.DB) error {
102+	var version int
103+	if err := db.QueryRow("PRAGMA user_version").Scan(&version); err != nil {
104+		return fmt.Errorf("failed to query schema version: %v", err)
105+	}
106+
107+	if version == len(sqliteMigrations) {
108+		return nil
109+	} else if version > len(sqliteMigrations) {
110+		return fmt.Errorf("(version %d) older than schema (version %d)", len(sqliteMigrations), version)
111+	}
112+
113+	tx, err := db.Beginx()
114+	if err != nil {
115+		return err
116+	}
117+	defer func() {
118+		_ = tx.Rollback()
119+	}()
120+
121+	if version == 0 {
122+		if _, err := tx.Exec(sqliteSchema); err != nil {
123+			return fmt.Errorf("failed to initialize schema: %v", err)
124+		}
125+	} else {
126+		for i := version; i < len(sqliteMigrations); i++ {
127+			if _, err := tx.Exec(sqliteMigrations[i]); err != nil {
128+				return fmt.Errorf("failed to execute migration #%v: %v", i, err)
129+			}
130+		}
131+	}
132+
133+	// For some reason prepared statements don't work here
134+	_, err = tx.Exec(fmt.Sprintf("PRAGMA user_version = %d", len(sqliteMigrations)))
135+	if err != nil {
136+		return fmt.Errorf("failed to bump schema version: %v", err)
137+	}
138+
139+	return tx.Commit()
140+}
M pkg/apps/pgs/ssh.go
+1, -1
1@@ -38,7 +38,7 @@ func StartSshServer(cfg *PgsConfig, killCh chan error) {
2 
3 	webTunnel := &tunkit.WebTunnelHandler{
4 		Logger:      logger,
5-		HttpHandler: createHttpHandler(cfg),
6+		HttpHandler: CreateHttpHandler(cfg),
7 	}
8 
9 	// Create a new SSH server
M pkg/apps/pgs/ssh_test.go
+10, -2
 1@@ -39,7 +39,11 @@ func TestSshServerSftp(t *testing.T) {
 2 	if err != nil {
 3 		panic(err)
 4 	}
 5-	cfg := NewPgsConfig(logger, dbpool, st)
 6+	pubsub := NewPubsubChan()
 7+	defer func() {
 8+		_ = pubsub.Close()
 9+	}()
10+	cfg := NewPgsConfig(logger, dbpool, st, pubsub)
11 	done := make(chan error)
12 	prometheus.DefaultRegisterer = prometheus.NewRegistry()
13 	go StartSshServer(cfg, done)
14@@ -123,7 +127,11 @@ func TestSshServerRsync(t *testing.T) {
15 	if err != nil {
16 		panic(err)
17 	}
18-	cfg := NewPgsConfig(logger, dbpool, st)
19+	pubsub := NewPubsubChan()
20+	defer func() {
21+		_ = pubsub.Close()
22+	}()
23+	cfg := NewPgsConfig(logger, dbpool, st, pubsub)
24 	done := make(chan error)
25 	prometheus.DefaultRegisterer = prometheus.NewRegistry()
26 	go StartSshServer(cfg, done)
M pkg/apps/pgs/tunnel.go
+1, -1
1@@ -45,7 +45,7 @@ func getInfoFromUser(user string) (string, string) {
2 	return "", user
3 }
4 
5-func createHttpHandler(cfg *PgsConfig) CtxHttpBridge {
6+func CreateHttpHandler(cfg *PgsConfig) CtxHttpBridge {
7 	return func(ctx *pssh.SSHServerConnSession) http.Handler {
8 		logger := cfg.Logger
9 		asUser, subdomain := getInfoFromUser(ctx.User())
M pkg/apps/pgs/uploader.go
+1, -2
 1@@ -569,7 +569,6 @@ func (h *UploadAssetHandler) writeAsset(s *pssh.SSHServerConnSession, reader io.
 2 // Repeated messages for the same site are grouped so that we only flush once
 3 // per site per 5 seconds.
 4 func runCacheQueue(cfg *PgsConfig, ctx context.Context) {
 5-	send := createPubCacheDrain(ctx, cfg.Logger)
 6 	var pendingFlushes sync.Map
 7 	tick := time.Tick(5 * time.Second)
 8 	for {
 9@@ -580,7 +579,7 @@ func runCacheQueue(cfg *PgsConfig, ctx context.Context) {
10 			go func() {
11 				pendingFlushes.Range(func(key, value any) bool {
12 					pendingFlushes.Delete(key)
13-					err := purgeCache(cfg, send, key.(string))
14+					err := purgeCache(cfg, cfg.Pubsub, key.(string))
15 					if err != nil {
16 						cfg.Logger.Error("failed to clear cache", "err", err.Error())
17 					}
M pkg/apps/pgs/web.go
+1, -2
 1@@ -283,10 +283,9 @@ func (web *WebRouter) checkHandler(w http.ResponseWriter, r *http.Request) {
 2 
 3 func (web *WebRouter) CacheMgmt(ctx context.Context, httpCache *middleware.SouinBaseHandler, notify chan string) {
 4 	storer := httpCache.Storers[0]
 5-	drain := createSubCacheDrain(ctx, web.Cfg.Logger)
 6 
 7 	for {
 8-		scanner := bufio.NewScanner(drain)
 9+		scanner := bufio.NewScanner(web.Cfg.Pubsub)
10 		scanner.Buffer(make([]byte, 32*1024), 32*1024)
11 		for scanner.Scan() {
12 			surrogateKey := strings.TrimSpace(scanner.Text())
M pkg/apps/pgs/web_cache.go
+60, -6
  1@@ -3,6 +3,7 @@ package pgs
  2 import (
  3 	"context"
  4 	"fmt"
  5+	"io"
  6 	"log/slog"
  7 	"time"
  8 
  9@@ -10,11 +11,64 @@ import (
 10 	"github.com/picosh/utils/pipe"
 11 )
 12 
 13+type PicoPubsub interface {
 14+	io.Reader
 15+	io.Writer
 16+	io.Closer
 17+}
 18+
 19+type PubsubPipe struct {
 20+	Pipe *pipe.ReconnectReadWriteCloser
 21+}
 22+
 23+func (p *PubsubPipe) Read(b []byte) (int, error) {
 24+	return p.Pipe.Read(b)
 25+}
 26+
 27+func (p *PubsubPipe) Write(b []byte) (int, error) {
 28+	return p.Pipe.Write(b)
 29+}
 30+
 31+func (p *PubsubPipe) Close() error {
 32+	return p.Pipe.Close()
 33+}
 34+
 35+func NewPubsubPipe(pipe *pipe.ReconnectReadWriteCloser) *PubsubPipe {
 36+	return &PubsubPipe{
 37+		Pipe: pipe,
 38+	}
 39+}
 40+
 41+type PubsubChan struct {
 42+	Chan chan []byte
 43+}
 44+
 45+func (p *PubsubChan) Read(b []byte) (int, error) {
 46+	n := copy(b, <-p.Chan)
 47+	return n, nil
 48+}
 49+
 50+func (p *PubsubChan) Write(b []byte) (int, error) {
 51+	p.Chan <- b
 52+	return len(b), nil
 53+}
 54+
 55+func (p *PubsubChan) Close() error {
 56+	close(p.Chan)
 57+	return nil
 58+}
 59+
 60+func NewPubsubChan() *PubsubChan {
 61+	return &PubsubChan{
 62+		Chan: make(chan []byte),
 63+	}
 64+}
 65+
 66 func getSurrogateKey(userName, projectName string) string {
 67 	return fmt.Sprintf("%s-%s", userName, projectName)
 68 }
 69 
 70-func createPubCacheDrain(ctx context.Context, logger *slog.Logger) *pipe.ReconnectReadWriteCloser {
 71+func CreatePubCacheDrain(ctx context.Context, logger *slog.Logger) *pipe.ReconnectReadWriteCloser {
 72 	info := shared.NewPicoPipeClient()
 73 	send := pipe.NewReconnectReadWriteCloser(
 74 		ctx,
 75@@ -28,7 +82,7 @@ func createPubCacheDrain(ctx context.Context, logger *slog.Logger) *pipe.Reconne
 76 	return send
 77 }
 78 
 79-func createSubCacheDrain(ctx context.Context, logger *slog.Logger) *pipe.ReconnectReadWriteCloser {
 80+func CreateSubCacheDrain(ctx context.Context, logger *slog.Logger) *pipe.ReconnectReadWriteCloser {
 81 	info := shared.NewPicoPipeClient()
 82 	send := pipe.NewReconnectReadWriteCloser(
 83 		ctx,
 84@@ -48,13 +102,13 @@ func createSubCacheDrain(ctx context.Context, logger *slog.Logger) *pipe.Reconne
 85 // cached assets for a given subdomain are grouped under a single key (which is
 86 // separate from the "GET-https-example.com-/path" key used for serving files
 87 // from the cache).
 88-func purgeCache(cfg *PgsConfig, send *pipe.ReconnectReadWriteCloser, surrogate string) error {
 89+func purgeCache(cfg *PgsConfig, writer io.Writer, surrogate string) error {
 90 	cfg.Logger.Info("purging cache", "surrogate", surrogate)
 91 	time.Sleep(1 * time.Second)
 92-	_, err := send.Write([]byte(surrogate + "\n"))
 93+	_, err := writer.Write([]byte(surrogate + "\n"))
 94 	return err
 95 }
 96 
 97-func purgeAllCache(cfg *PgsConfig, send *pipe.ReconnectReadWriteCloser) error {
 98-	return purgeCache(cfg, send, "*")
 99+func purgeAllCache(cfg *PgsConfig, writer io.Writer) error {
100+	return purgeCache(cfg, writer, "*")
101 }
M pkg/apps/pgs/web_test.go
+10, -2
 1@@ -318,7 +318,11 @@ func TestApiBasic(t *testing.T) {
 2 			responseRecorder := httptest.NewRecorder()
 3 
 4 			st, _ := storage.NewStorageMemory(tc.storage)
 5-			cfg := NewPgsConfig(logger, dbpool, st)
 6+			pubsub := NewPubsubChan()
 7+			defer func() {
 8+				_ = pubsub.Close()
 9+			}()
10+			cfg := NewPgsConfig(logger, dbpool, st, pubsub)
11 			cfg.Domain = "pgs.test"
12 			router := NewWebRouter(cfg)
13 			router.ServeHTTP(responseRecorder, request)
14@@ -416,7 +420,11 @@ func TestImageManipulation(t *testing.T) {
15 					Ratio: &storage.Ratio{},
16 				},
17 			}
18-			cfg := NewPgsConfig(logger, dbpool, st)
19+			pubsub := NewPubsubChan()
20+			defer func() {
21+				_ = pubsub.Close()
22+			}()
23+			cfg := NewPgsConfig(logger, dbpool, st, pubsub)
24 			cfg.Domain = "pgs.test"
25 			router := NewWebRouter(cfg)
26 			router.ServeHTTP(responseRecorder, request)
M pkg/apps/pico/config.go
+4, -1
 1@@ -1,6 +1,8 @@
 2 package pico
 3 
 4 import (
 5+	"strings"
 6+
 7 	"github.com/picosh/pico/pkg/shared"
 8 	"github.com/picosh/utils"
 9 )
10@@ -8,11 +10,12 @@ import (
11 func NewConfigSite(service string) *shared.ConfigSite {
12 	dbURL := utils.GetEnv("DATABASE_URL", "")
13 	tuns := utils.GetEnv("TUNS_CONSOLE_SECRET", "")
14+	withPipe := strings.ToLower(utils.GetEnv("PICO_PIPE_ENABLED", "true")) == "true"
15 
16 	return &shared.ConfigSite{
17 		DbURL:      dbURL,
18 		Space:      "pico",
19-		Logger:     shared.CreateLogger(service),
20+		Logger:     shared.CreateLogger(service, withPipe),
21 		TunsSecret: tuns,
22 	}
23 }
M pkg/apps/pipe/config.go
+4, -1
 1@@ -1,6 +1,8 @@
 2 package pipe
 3 
 4 import (
 5+	"strings"
 6+
 7 	"github.com/picosh/pico/pkg/shared"
 8 	"github.com/picosh/utils"
 9 )
10@@ -10,13 +12,14 @@ func NewConfigSite(service string) *shared.ConfigSite {
11 	port := utils.GetEnv("PIPE_WEB_PORT", "3000")
12 	dbURL := utils.GetEnv("DATABASE_URL", "")
13 	protocol := utils.GetEnv("PIPE_PROTOCOL", "https")
14+	withPipe := strings.ToLower(utils.GetEnv("PICO_PIPE_ENABLED", "true")) == "true"
15 
16 	return &shared.ConfigSite{
17 		Domain:   domain,
18 		Port:     port,
19 		Protocol: protocol,
20 		DbURL:    dbURL,
21-		Logger:   shared.CreateLogger(service),
22+		Logger:   shared.CreateLogger(service, withPipe),
23 		Space:    "pipe",
24 	}
25 }
M pkg/apps/prose/config.go
+4, -1
 1@@ -1,6 +1,8 @@
 2 package prose
 3 
 4 import (
 5+	"strings"
 6+
 7 	"github.com/picosh/pico/pkg/shared"
 8 	"github.com/picosh/utils"
 9 )
10@@ -15,6 +17,7 @@ func NewConfigSite(service string) *shared.ConfigSite {
11 	dbURL := utils.GetEnv("DATABASE_URL", "")
12 	maxSize := uint64(25 * utils.MB)
13 	maxImgSize := int64(10 * utils.MB)
14+	withPipe := strings.ToLower(utils.GetEnv("PICO_PIPE_ENABLED", "true")) == "true"
15 
16 	return &shared.ConfigSite{
17 		Debug:    debug == "1",
18@@ -34,7 +37,7 @@ func NewConfigSite(service string) *shared.ConfigSite {
19 			".ico",
20 		},
21 		HiddenPosts:  []string{"_readme.md", "_styles.css", "_footer.md", "_404.md"},
22-		Logger:       shared.CreateLogger(service),
23+		Logger:       shared.CreateLogger(service, withPipe),
24 		MaxSize:      maxSize,
25 		MaxAssetSize: maxImgSize,
26 	}
M pkg/db/db.go
+39, -21
  1@@ -14,6 +14,22 @@ var ErrNameDenied = errors.New("username is on the denylist")
  2 var ErrNameInvalid = errors.New("username has invalid characters in it")
  3 var ErrPublicKeyTaken = errors.New("public key is already associated with another user")
  4 
  5+// sqlite uses string to BLOB type and postgres uses []uint8 for JSONB.
  6+func tcast(value any) ([]byte, error) {
  7+	switch val := value.(type) {
  8+	// sqlite3 BLOB
  9+	case string:
 10+		return []byte(val), nil
 11+	// postgres JSONB: []uint8
 12+	default:
 13+		b, ok := val.([]byte)
 14+		if !ok {
 15+			return []byte{}, errors.New("type assertion to []byte failed")
 16+		}
 17+		return b, nil
 18+	}
 19+}
 20+
 21 type PublicKey struct {
 22 	ID        string     `json:"id" db:"id"`
 23 	UserID    string     `json:"user_id" db:"user_id"`
 24@@ -43,10 +59,10 @@ func (p PostData) Value() (driver.Value, error) {
 25 
 26 // Make the Attrs struct implement the sql.Scanner interface. This method
 27 // simply decodes a JSON-encoded value into the struct fields.
 28-func (p *PostData) Scan(value interface{}) error {
 29-	b, ok := value.([]byte)
 30-	if !ok {
 31-		return errors.New("type assertion to []byte failed")
 32+func (p *PostData) Scan(value any) error {
 33+	b, err := tcast(value)
 34+	if err != nil {
 35+		return err
 36 	}
 37 
 38 	return json.Unmarshal(b, &p)
 39@@ -77,12 +93,11 @@ func (p ProjectAcl) Value() (driver.Value, error) {
 40 
 41 // Make the Attrs struct implement the sql.Scanner interface. This method
 42 // simply decodes a JSON-encoded value into the struct fields.
 43-func (p *ProjectAcl) Scan(value interface{}) error {
 44-	b, ok := value.([]byte)
 45-	if !ok {
 46-		return errors.New("type assertion to []byte failed")
 47+func (p *ProjectAcl) Scan(value any) error {
 48+	b, err := tcast(value)
 49+	if err != nil {
 50+		return err
 51 	}
 52-
 53 	return json.Unmarshal(b, &p)
 54 }
 55 
 56@@ -102,11 +117,12 @@ func (p FeedItemData) Value() (driver.Value, error) {
 57 
 58 // Make the Attrs struct implement the sql.Scanner interface. This method
 59 // simply decodes a JSON-encoded value into the struct fields.
 60-func (p *FeedItemData) Scan(value interface{}) error {
 61-	b, ok := value.([]byte)
 62-	if !ok {
 63-		return errors.New("type assertion to []byte failed")
 64+func (p *FeedItemData) Scan(value any) error {
 65+	b, err := tcast(value)
 66+	if err != nil {
 67+		return err
 68 	}
 69+
 70 	return json.Unmarshal(b, &p)
 71 }
 72 
 73@@ -274,11 +290,12 @@ func (p FeatureFlagData) Value() (driver.Value, error) {
 74 
 75 // Make the Attrs struct implement the sql.Scanner interface. This method
 76 // simply decodes a JSON-encoded value into the struct fields.
 77-func (p *FeatureFlagData) Scan(value interface{}) error {
 78-	b, ok := value.([]byte)
 79-	if !ok {
 80-		return errors.New("type assertion to []byte failed")
 81+func (p *FeatureFlagData) Scan(value any) error {
 82+	b, err := tcast(value)
 83+	if err != nil {
 84+		return err
 85 	}
 86+
 87 	return json.Unmarshal(b, &p)
 88 }
 89 
 90@@ -295,11 +312,12 @@ func (p PaymentHistoryData) Value() (driver.Value, error) {
 91 
 92 // Make the Attrs struct implement the sql.Scanner interface. This method
 93 // simply decodes a JSON-encoded value into the struct fields.
 94-func (p *PaymentHistoryData) Scan(value interface{}) error {
 95-	b, ok := value.([]byte)
 96-	if !ok {
 97-		return errors.New("type assertion to []byte failed")
 98+func (p *PaymentHistoryData) Scan(value any) error {
 99+	b, err := tcast(value)
100+	if err != nil {
101+		return err
102 	}
103+
104 	return json.Unmarshal(b, &p)
105 }
106 
M pkg/shared/config.go
+2, -3
 1@@ -13,7 +13,6 @@ import (
 2 	"time"
 3 
 4 	"github.com/picosh/pico/pkg/db"
 5-	"github.com/picosh/utils"
 6 
 7 	pipeLogger "github.com/picosh/utils/pipe/log"
 8 )
 9@@ -269,7 +268,7 @@ func (c *ConfigSite) AssetURL(username, projectName, fpath string) string {
10 	)
11 }
12 
13-func CreateLogger(space string) *slog.Logger {
14+func CreateLogger(space string, withPipe bool) *slog.Logger {
15 	logger := slog.New(
16 		slog.NewTextHandler(
17 			os.Stdout,
18@@ -280,7 +279,7 @@ func CreateLogger(space string) *slog.Logger {
19 		),
20 	)
21 
22-	if strings.ToLower(utils.GetEnv("PICO_PIPE_ENABLED", "true")) == "true" {
23+	if withPipe {
24 		conn := NewPicoPipeClient()
25 		logger = pipeLogger.RegisterReconnectLogger(context.Background(), logger, conn, 100, 10*time.Millisecond)
26 	}