- commit
- b4b879b
- parent
- e2630f9
- author
- Eric Bower
- date
- 2025-01-17 19:03:00 -0500 EST
chore: setup upload-drain for all pgs/prose files uploaded
12 files changed,
+185,
-143
+51,
-34
1@@ -4,16 +4,14 @@ import (
2 "bufio"
3 "context"
4 "encoding/json"
5- "log/slog"
6 "sync"
7 "time"
8
9 "github.com/picosh/pico/db/postgres"
10- "github.com/picosh/pico/filehandlers"
11+ fileshared "github.com/picosh/pico/filehandlers/shared"
12 "github.com/picosh/pico/prose"
13 "github.com/picosh/pico/shared"
14 "github.com/picosh/pico/shared/storage"
15- "github.com/picosh/utils/pipe"
16 )
17
18 func bail(err error) {
19@@ -22,20 +20,27 @@ func bail(err error) {
20 }
21 }
22
23-func render(ssg *prose.SSG, ch chan string) {
24+type RenderEvent struct {
25+ UserID string
26+ Service string
27+}
28+
29+// run queue on an interval to merge file uploads from same user.
30+func render(ssg *prose.SSG, ch chan RenderEvent) {
31 var pendingFlushes sync.Map
32 tick := time.Tick(10 * time.Second)
33 for {
34 select {
35- case userID := <-ch:
36- ssg.Logger.Info("received request to generate blog", "userId", userID)
37- pendingFlushes.Store(userID, userID)
38+ case event := <-ch:
39+ ssg.Logger.Info("received request to generate blog", "userId", event.UserID)
40+ pendingFlushes.Store(event.UserID, event.Service)
41 case <-tick:
42 ssg.Logger.Info("flushing ssg requests")
43 go func() {
44 pendingFlushes.Range(func(key, value any) bool {
45 pendingFlushes.Delete(key)
46- user, err := ssg.DB.FindUser(value.(string))
47+ event := value.(RenderEvent)
48+ user, err := ssg.DB.FindUser(event.UserID)
49 if err != nil {
50 ssg.Logger.Error("cannot find user", "err", err)
51 return true
52@@ -47,7 +52,7 @@ func render(ssg *prose.SSG, ch chan string) {
53 return true
54 }
55
56- err = ssg.ProseBlog(user, bucket)
57+ err = ssg.ProseBlog(user, bucket, event.Service)
58 if err != nil {
59 ssg.Logger.Error("cannot generate blog", "err", err)
60 }
61@@ -58,20 +63,6 @@ func render(ssg *prose.SSG, ch chan string) {
62 }
63 }
64
65-func createSubProseDrain(ctx context.Context, logger *slog.Logger) *pipe.ReconnectReadWriteCloser {
66- info := shared.NewPicoPipeClient()
67- send := pipe.NewReconnectReadWriteCloser(
68- ctx,
69- logger,
70- info,
71- "sub to prose-drain",
72- "sub prose-drain -k",
73- 100,
74- -1,
75- )
76- return send
77-}
78-
79 func main() {
80 cfg := prose.NewConfigSite()
81 logger := cfg.Logger
82@@ -89,15 +80,15 @@ func main() {
83 }
84
85 ctx := context.Background()
86- drain := createSubProseDrain(ctx, cfg.Logger)
87+ drain := fileshared.CreateSubUploadDrain(ctx, cfg.Logger)
88
89- ch := make(chan string)
90+ ch := make(chan RenderEvent)
91 go render(ssg, ch)
92
93 for {
94 scanner := bufio.NewScanner(drain)
95 for scanner.Scan() {
96- var data filehandlers.SuccesHook
97+ var data fileshared.FileUploaded
98
99 err := json.Unmarshal(scanner.Bytes(), &data)
100 if err != nil {
101@@ -105,26 +96,52 @@ func main() {
102 continue
103 }
104
105+ // we don't care about any other pgs sites so ignore them
106+ if data.Service == "pgs" && data.ProjectName != "prose" {
107+ continue
108+ }
109+
110 logger = logger.With(
111 "userId", data.UserID,
112 "filename", data.Filename,
113 "action", data.Action,
114+ "project", data.ProjectName,
115+ "service", data.Service,
116 )
117
118+ bucket, err := ssg.Storage.GetBucket(shared.GetAssetBucketName(data.UserID))
119+ if err != nil {
120+ ssg.Logger.Error("cannot find bucket", "err", err)
121+ continue
122+ }
123+ user, err := ssg.DB.FindUser(data.UserID)
124+ if err != nil {
125+ logger.Error("cannot find user", "err", err)
126+ continue
127+ }
128+
129 if data.Action == "delete" {
130- bucket, err := ssg.Storage.GetBucket(shared.GetAssetBucketName(data.UserID))
131- if err != nil {
132- ssg.Logger.Error("cannot find bucket", "err", err)
133- continue
134- }
135 err = st.DeleteObject(bucket, data.Filename)
136 if err != nil {
137 logger.Error("cannot delete object", "err", err)
138- continue
139 }
140- ch <- data.UserID
141+ post, err := ssg.DB.FindPostWithFilename(data.Filename, data.UserID, "prose")
142+ if err != nil {
143+ logger.Error("cannot find post", "err", err)
144+ } else {
145+ err = ssg.DB.RemovePosts([]string{post.ID})
146+ if err != nil {
147+ logger.Error("cannot delete post", "err", err)
148+ }
149+ }
150+ ch <- RenderEvent{data.UserID, data.Service}
151 } else if data.Action == "create" || data.Action == "update" {
152- ch <- data.UserID
153+ _, err := ssg.UpsertPost(user.ID, user.Name, bucket, data.Filename)
154+ if err != nil {
155+ logger.Error("cannot upsert post", "err", err)
156+ continue
157+ }
158+ ch <- RenderEvent{data.UserID, data.Service}
159 }
160 }
161 }
+2,
-1
1@@ -11,6 +11,7 @@ import (
2 "github.com/charmbracelet/ssh"
3 "github.com/picosh/pico/db"
4 "github.com/picosh/pico/filehandlers"
5+ fileshared "github.com/picosh/pico/filehandlers/shared"
6 "github.com/picosh/pico/shared"
7 "github.com/picosh/utils"
8 )
9@@ -82,6 +83,6 @@ func (p *FeedHooks) FileMeta(s ssh.Session, data *filehandlers.PostMetaData) err
10 return nil
11 }
12
13-func (p *FeedHooks) FileSuccess(s ssh.Session, data *filehandlers.SuccesHook) error {
14+func (p *FeedHooks) FileSuccess(s ssh.Session, data *fileshared.FileUploaded) error {
15 return nil
16 }
+6,
-4
1@@ -15,7 +15,7 @@ import (
2 "github.com/charmbracelet/ssh"
3 exifremove "github.com/neurosnap/go-exif-remove"
4 "github.com/picosh/pico/db"
5- "github.com/picosh/pico/filehandlers"
6+ fileshared "github.com/picosh/pico/filehandlers/shared"
7 "github.com/picosh/pico/shared"
8 "github.com/picosh/pico/shared/storage"
9 "github.com/picosh/pobj"
10@@ -91,7 +91,7 @@ func (h *UploadImgHandler) Read(s ssh.Session, entry *sendutils.FileEntry) (os.F
11 return fileInfo, reader, nil
12 }
13
14-func (h *UploadImgHandler) Success(s ssh.Session, data *filehandlers.SuccesHook) error {
15+func (h *UploadImgHandler) Success(s ssh.Session, data *fileshared.FileUploaded) error {
16 out, err := json.Marshal(data)
17 if err != nil {
18 return err
19@@ -186,10 +186,11 @@ func (h *UploadImgHandler) Write(s ssh.Session, entry *sendutils.FileEntry) (str
20 return "", err
21 }
22
23- _ = h.Success(s, &filehandlers.SuccesHook{
24+ _ = h.Success(s, &fileshared.FileUploaded{
25 UserID: user.ID,
26 Action: "create",
27 Filename: metadata.Filename,
28+ Service: "prose",
29 })
30
31 curl := shared.NewCreateURL(h.Cfg)
32@@ -250,10 +251,11 @@ func (h *UploadImgHandler) Delete(s ssh.Session, entry *sendutils.FileEntry) err
33 return err
34 }
35
36- _ = h.Success(s, &filehandlers.SuccesHook{
37+ _ = h.Success(s, &fileshared.FileUploaded{
38 UserID: user.ID,
39 Action: "delete",
40 Filename: filename,
41+ Service: "prose",
42 })
43
44 return nil
+6,
-12
1@@ -12,6 +12,7 @@ import (
2
3 "github.com/charmbracelet/ssh"
4 "github.com/picosh/pico/db"
5+ fileshared "github.com/picosh/pico/filehandlers/shared"
6 "github.com/picosh/pico/shared"
7 "github.com/picosh/pico/shared/storage"
8 sendutils "github.com/picosh/send/utils"
9@@ -27,17 +28,10 @@ type PostMetaData struct {
10 Aliases []string
11 }
12
13-type SuccesHook struct {
14- UserID string `json:"user_id"`
15- PostID string `json:"post_id"`
16- Action string `json:"action"`
17- Filename string `json:"filename"`
18-}
19-
20 type ScpFileHooks interface {
21 FileValidate(s ssh.Session, data *PostMetaData) (bool, error)
22 FileMeta(s ssh.Session, data *PostMetaData) error
23- FileSuccess(s ssh.Session, data *SuccesHook) error
24+ FileSuccess(s ssh.Session, data *fileshared.FileUploaded) error
25 }
26
27 type ScpUploadHandler struct {
28@@ -267,11 +261,11 @@ func (h *ScpUploadHandler) Write(s ssh.Session, entry *sendutils.FileEntry) (str
29 }
30 }
31
32- _ = h.Hooks.FileSuccess(s, &SuccesHook{
33+ _ = h.Hooks.FileSuccess(s, &fileshared.FileUploaded{
34 UserID: user.ID,
35- PostID: post.ID,
36 Action: action,
37 Filename: metadata.Filename,
38+ Service: h.Cfg.Space,
39 })
40 curl := shared.NewCreateURL(h.Cfg)
41 return h.Cfg.FullPostURL(curl, user.Name, metadata.Slug), nil
42@@ -307,11 +301,11 @@ func (h *ScpUploadHandler) Delete(s ssh.Session, entry *sendutils.FileEntry) err
43 logger.Error("post could not remove", "err", err.Error())
44 return fmt.Errorf("error for %s: %v", filename, err)
45 }
46- _ = h.Hooks.FileSuccess(s, &SuccesHook{
47+ _ = h.Hooks.FileSuccess(s, &fileshared.FileUploaded{
48 UserID: user.ID,
49- PostID: post.ID,
50 Action: "delete",
51 Filename: filename,
52+ Service: h.Cfg.Space,
53 })
54 return nil
55 }
1@@ -0,0 +1,57 @@
2+package fileshared
3+
4+import (
5+ "context"
6+ "encoding/json"
7+ "log/slog"
8+
9+ "github.com/picosh/pico/shared"
10+ pipeUtil "github.com/picosh/utils/pipe"
11+)
12+
13+type FileUploaded struct {
14+ UserID string `json:"user_id"`
15+ Action string `json:"action"`
16+ Filename string `json:"filename"`
17+ Service string `json:"service"`
18+ ProjectName string `json:"project_name"`
19+}
20+
21+func CreatePubUploadDrain(ctx context.Context, logger *slog.Logger) *pipeUtil.ReconnectReadWriteCloser {
22+ info := shared.NewPicoPipeClient()
23+ send := pipeUtil.NewReconnectReadWriteCloser(
24+ ctx,
25+ logger,
26+ info,
27+ "pub to upload-drain",
28+ "pub upload-drain -b=false",
29+ 100,
30+ -1,
31+ )
32+ return send
33+}
34+
35+func WriteUploadDrain(drain *pipeUtil.ReconnectReadWriteCloser, upload *FileUploaded) error {
36+ jso, err := json.Marshal(upload)
37+ if err != nil {
38+ return err
39+ }
40+
41+ jso = append(jso, '\n')
42+ _, err = drain.Write(jso)
43+ return err
44+}
45+
46+func CreateSubUploadDrain(ctx context.Context, logger *slog.Logger) *pipeUtil.ReconnectReadWriteCloser {
47+ info := shared.NewPicoPipeClient()
48+ send := pipeUtil.NewReconnectReadWriteCloser(
49+ ctx,
50+ logger,
51+ info,
52+ "sub to upload-drain",
53+ "sub upload-drain -k",
54+ 100,
55+ -1,
56+ )
57+ return send
58+}
M
go.mod
+2,
-2
1@@ -6,7 +6,7 @@ go 1.23.1
2
3 // replace github.com/picosh/send => ../send
4
5-// replace github.com/picosh/pobj => ../pobj
6+replace github.com/picosh/pobj => ../pobj
7
8 // replace github.com/picosh/pubsub => ../pubsub
9
10@@ -50,7 +50,7 @@ require (
11 github.com/neurosnap/go-exif-remove v0.0.0-20221010134343-50d1e3c35577
12 github.com/picosh/pobj v0.0.0-20250115045405-73c816ed76c2
13 github.com/picosh/pubsub v0.0.0-20241114191831-ec8f16c0eb88
14- github.com/picosh/send v0.0.0-20241218031305-056b1fe8ff80
15+ github.com/picosh/send v0.0.0-20250118030432-81b0df1712ac
16 github.com/picosh/tunkit v0.0.0-20240905223921-532404cef9d9
17 github.com/picosh/utils v0.0.0-20241120033529-8ca070c09bf4
18 github.com/sabhiram/go-gitignore v0.0.0-20210923224102-525f6e181f06
M
go.sum
+2,
-4
1@@ -711,12 +711,10 @@ github.com/philhofer/fwd v1.1.3-0.20240916144458-20a13a1f6b7c h1:dAMKvw0MlJT1Gsh
2 github.com/philhofer/fwd v1.1.3-0.20240916144458-20a13a1f6b7c/go.mod h1:RqIHx9QI14HlwKwm98g9Re5prTQ6LdeRQn+gXJFxsJM=
3 github.com/picosh/go-rsync-receiver v0.0.0-20240709135253-1daf4b12a9fc h1:bvcsoOvaNHPquFnRkdraEo7+8t6bW7nWEhlALnwZPdI=
4 github.com/picosh/go-rsync-receiver v0.0.0-20240709135253-1daf4b12a9fc/go.mod h1:i0iR3W4GSm1PuvVxB9OH32E5jP+CYkVb2NQSe0JCtlo=
5-github.com/picosh/pobj v0.0.0-20250115045405-73c816ed76c2 h1:fOz+o8pymr93p5OeJkehxkunWeFyVranWBsOmEE0OkI=
6-github.com/picosh/pobj v0.0.0-20250115045405-73c816ed76c2/go.mod h1:cF+eAl4G1vU+WOD8cYCKaxokHo6MWmbR8J4/SJnvESg=
7 github.com/picosh/pubsub v0.0.0-20241114191831-ec8f16c0eb88 h1:hdxE6rquHHw1/eeqS1b+ojLaxGtN8zOiTUclPwaVbPg=
8 github.com/picosh/pubsub v0.0.0-20241114191831-ec8f16c0eb88/go.mod h1:+9hDKIDHQCvGFigCVlIl589BwpT9R4boKhUVc/OgRU4=
9-github.com/picosh/send v0.0.0-20241218031305-056b1fe8ff80 h1:m0x9UOipmz0HCMNuhpzOgxRgOHefgNebmpcTwu0CwxU=
10-github.com/picosh/send v0.0.0-20241218031305-056b1fe8ff80/go.mod h1:RAgLDK3LrDK6pNeXtU9tjo28obl5DxShcTUk2nm/KCM=
11+github.com/picosh/send v0.0.0-20250118030432-81b0df1712ac h1:KjP50VpXjwc6R52tpyd2TESMF6T5/dNrzu6TM3gw6P0=
12+github.com/picosh/send v0.0.0-20250118030432-81b0df1712ac/go.mod h1:RAgLDK3LrDK6pNeXtU9tjo28obl5DxShcTUk2nm/KCM=
13 github.com/picosh/tunkit v0.0.0-20240905223921-532404cef9d9 h1:g5oZmnDFr11HarA8IAXcc4o9PBlolSM59QIATCSoato=
14 github.com/picosh/tunkit v0.0.0-20240905223921-532404cef9d9/go.mod h1:UrDH/VCIc1wg/L6iY2zSYt4TiGw+25GsKSnkVkU40Dw=
15 github.com/picosh/utils v0.0.0-20241120033529-8ca070c09bf4 h1:pwbgY9shKyMlpYvpUalTyV0ZVd5paj8pSEYT4OPOYTk=
+2,
-1
1@@ -10,6 +10,7 @@ import (
2 "github.com/charmbracelet/ssh"
3 "github.com/picosh/pico/db"
4 "github.com/picosh/pico/filehandlers"
5+ fileshared "github.com/picosh/pico/filehandlers/shared"
6 "github.com/picosh/pico/shared"
7 "github.com/picosh/utils"
8 )
9@@ -98,6 +99,6 @@ func (p *FileHooks) FileMeta(s ssh.Session, data *filehandlers.PostMetaData) err
10 return nil
11 }
12
13-func (p *FileHooks) FileSuccess(s ssh.Session, data *filehandlers.SuccesHook) error {
14+func (p *FileHooks) FileSuccess(s ssh.Session, data *fileshared.FileUploaded) error {
15 return nil
16 }
+34,
-1
1@@ -18,11 +18,13 @@ import (
2 "github.com/charmbracelet/ssh"
3 "github.com/charmbracelet/wish"
4 "github.com/picosh/pico/db"
5+ fileshared "github.com/picosh/pico/filehandlers/shared"
6 "github.com/picosh/pico/shared"
7 "github.com/picosh/pobj"
8 sst "github.com/picosh/pobj/storage"
9 sendutils "github.com/picosh/send/utils"
10 "github.com/picosh/utils"
11+ pipeutils "github.com/picosh/utils/pipe"
12 ignore "github.com/sabhiram/go-gitignore"
13 )
14
15@@ -103,17 +105,21 @@ type UploadAssetHandler struct {
16 Cfg *shared.ConfigSite
17 Storage sst.ObjectStorage
18 CacheClearingQueue chan string
19+ UploadDrain *pipeutils.ReconnectReadWriteCloser
20 }
21
22 func NewUploadAssetHandler(dbpool db.DB, cfg *shared.ConfigSite, storage sst.ObjectStorage, ctx context.Context) *UploadAssetHandler {
23 // Enable buffering so we don't slow down uploads.
24 ch := make(chan string, 100)
25 go runCacheQueue(cfg, ctx, ch)
26+ // publish all file uploads to a pipe topic
27+ drain := fileshared.CreatePubUploadDrain(ctx, cfg.Logger)
28 return &UploadAssetHandler{
29 DBPool: dbpool,
30 Cfg: cfg,
31 Storage: storage,
32 CacheClearingQueue: ch,
33+ UploadDrain: drain,
34 }
35 }
36
37@@ -412,7 +418,22 @@ func (h *UploadAssetHandler) Write(s ssh.Session, entry *sendutils.FileEntry) (s
38 surrogate := getSurrogateKey(user.Name, projectName)
39 h.CacheClearingQueue <- surrogate
40
41- return str, nil
42+ action := ""
43+ if curFileSize == 0 {
44+ action = "create"
45+ } else {
46+ action = "updated"
47+ }
48+ upload := &fileshared.FileUploaded{
49+ UserID: user.ID,
50+ Action: action,
51+ Filename: assetFilename,
52+ Service: h.Cfg.Space,
53+ ProjectName: projectName,
54+ }
55+ err = fileshared.WriteUploadDrain(h.UploadDrain, upload)
56+
57+ return str, err
58 }
59
60 func isSpecialFile(entry *sendutils.FileEntry) bool {
61@@ -482,6 +503,18 @@ func (h *UploadAssetHandler) Delete(s ssh.Session, entry *sendutils.FileEntry) e
62 surrogate := getSurrogateKey(user.Name, projectName)
63 h.CacheClearingQueue <- surrogate
64
65+ if err != nil {
66+ return err
67+ }
68+
69+ upload := &fileshared.FileUploaded{
70+ UserID: user.ID,
71+ Action: "delete",
72+ Filename: assetFilepath,
73+ Service: h.Cfg.Space,
74+ ProjectName: projectName,
75+ }
76+ err = fileshared.WriteUploadDrain(h.UploadDrain, upload)
77 return err
78 }
79
+3,
-9
1@@ -1,7 +1,6 @@
2 package prose
3
4 import (
5- "encoding/json"
6 "fmt"
7 "strings"
8
9@@ -10,6 +9,7 @@ import (
10 "github.com/charmbracelet/ssh"
11 "github.com/picosh/pico/db"
12 "github.com/picosh/pico/filehandlers"
13+ fileshared "github.com/picosh/pico/filehandlers/shared"
14 "github.com/picosh/pico/shared"
15 "github.com/picosh/utils"
16 pipeUtil "github.com/picosh/utils/pipe"
17@@ -76,12 +76,6 @@ func (p *MarkdownHooks) FileMeta(s ssh.Session, data *filehandlers.PostMetaData)
18 return nil
19 }
20
21-func (p *MarkdownHooks) FileSuccess(s ssh.Session, data *filehandlers.SuccesHook) error {
22- out, err := json.Marshal(data)
23- if err != nil {
24- return err
25- }
26- out = append(out, '\n')
27- _, err = p.Pipe.Write(out)
28- return err
29+func (p *MarkdownHooks) FileSuccess(s ssh.Session, data *fileshared.FileUploaded) error {
30+ return fileshared.WriteUploadDrain(p.Pipe, data)
31 }
+18,
-58
1@@ -683,16 +683,12 @@ func (ssg *SSG) Prose() error {
2 }
3
4 for _, user := range users {
5- if user.Name != "erock" {
6- continue
7- }
8-
9 bucket, err := ssg.Storage.UpsertBucket(shared.GetAssetBucketName(user.ID))
10 if err != nil {
11 return err
12 }
13
14- err = ssg.ProseBlog(user, bucket)
15+ err = ssg.ProseBlog(user, bucket, "prose")
16 if err != nil {
17 log := shared.LoggerWithUser(ssg.Logger, user)
18 log.Error("could not generate blog for user", "err", err)
19@@ -742,11 +738,12 @@ func (ssg *SSG) NotFoundPage(logger *slog.Logger, user *db.User, blog *UserBlogD
20 return nil
21 }
22
23-func (ssg *SSG) findPost(username string, bucket sst.Bucket, filename string, modTime time.Time) (*db.Post, error) {
24- updatedAt := modTime
25+func (ssg *SSG) UpsertPost(userID, username string, bucket sst.Bucket, filename string) (*db.Post, error) {
26+ slug := utils.SanitizeFileExt(filename)
27+ updatedAt := time.Now()
28 fp := filepath.Join("prose/", filename)
29 logger := ssg.Logger.With("filename", fp)
30- rdr, info, err := ssg.Storage.GetObject(bucket, fp)
31+ rdr, _, err := ssg.Storage.GetObject(bucket, fp)
32 if err != nil {
33 logger.Error("get object", "err", err)
34 return nil, err
35@@ -762,20 +759,8 @@ func (ssg *SSG) findPost(username string, bucket sst.Bucket, filename string, mo
36 logger.Error("parse text", "err", err)
37 return nil, err
38 }
39- if parsed.PublishAt == nil || parsed.PublishAt.IsZero() {
40- ca := info.Metadata.Get("Date")
41- if ca != "" {
42- dt, err := time.Parse(time.RFC1123, ca)
43- if err != nil {
44- return nil, err
45- }
46- parsed.PublishAt = &dt
47- }
48- }
49-
50- slug := utils.SanitizeFileExt(filename)
51
52- return &db.Post{
53+ post := &db.Post{
54 IsVirtual: true,
55 Slug: slug,
56 Filename: filename,
57@@ -787,47 +772,22 @@ func (ssg *SSG) findPost(username string, bucket sst.Bucket, filename string, mo
58 Description: parsed.Description,
59 Title: utils.FilenameToTitle(filename, parsed.Title),
60 Username: username,
61- }, nil
62+ }
63+
64+ origPost, _ := ssg.DB.FindPostWithSlug(slug, userID, "prose")
65+ if origPost != nil {
66+ post.PublishAt = origPost.PublishAt
67+ return ssg.DB.UpdatePost(post)
68+ }
69+ return ssg.DB.InsertPost(post)
70 }
71
72 func (ssg *SSG) findPostByName(userID, username string, bucket sst.Bucket, filename string, modTime time.Time) (*db.Post, error) {
73- post, err := ssg.findPost(username, bucket, filename, modTime)
74- if err == nil {
75- return post, nil
76- }
77 return ssg.DB.FindPostWithFilename(filename, userID, Space)
78 }
79
80-func (ssg *SSG) findPosts(blog *UserBlogData) ([]*db.Post, bool, error) {
81- posts := []*db.Post{}
82+func (ssg *SSG) findPosts(blog *UserBlogData, service string) ([]*db.Post, bool, error) {
83 blog.Logger.Info("finding posts")
84- objs, _ := ssg.Storage.ListObjects(blog.Bucket, "prose/", true)
85- if len(objs) > 0 {
86- blog.Logger.Info("found posts in bucket, using them")
87- }
88- for _, obj := range objs {
89- if obj.IsDir() {
90- continue
91- }
92-
93- ext := filepath.Ext(obj.Name())
94- if ext == ".md" {
95- post, err := ssg.findPost(blog.User.Name, blog.Bucket, obj.Name(), obj.ModTime())
96- if err != nil {
97- blog.Logger.Error("find post", "err", err, "filename", obj.Name())
98- continue
99- }
100- posts = append(posts, post)
101- }
102- }
103-
104- // we found markdown files in the pgs site so the assumption is
105- // the pgs site is now the source of truth and we can ignore the posts table
106- if len(posts) > 0 {
107- return posts, true, nil
108- }
109-
110- blog.Logger.Info("no posts found in bucket, using posts table")
111 data, err := ssg.DB.FindPostsForUser(&db.Pager{Num: 1000, Page: 0}, blog.User.ID, Space)
112 if err != nil {
113 return nil, false, err
114@@ -846,7 +806,7 @@ type UserBlogData struct {
115 Logger *slog.Logger
116 }
117
118-func (ssg *SSG) ProseBlog(user *db.User, bucket sst.Bucket) error {
119+func (ssg *SSG) ProseBlog(user *db.User, bucket sst.Bucket, service string) error {
120 // programmatically generate redirects file based on aliases
121 // and other routes that were in prose that need to be available
122 redirectsFile := "/rss /rss.atom 301\n"
123@@ -859,7 +819,7 @@ func (ssg *SSG) ProseBlog(user *db.User, bucket sst.Bucket) error {
124 if err != nil {
125 return err
126 }
127- return ssg.ProseBlog(user, bucket)
128+ return ssg.ProseBlog(user, bucket, service)
129 }
130
131 blog := &UserBlogData{
132@@ -868,7 +828,7 @@ func (ssg *SSG) ProseBlog(user *db.User, bucket sst.Bucket) error {
133 Logger: logger,
134 }
135
136- posts, isVirtual, err := ssg.findPosts(blog)
137+ posts, isVirtual, err := ssg.findPosts(blog, service)
138 if err != nil {
139 // no posts found, bail on generating an empty blog
140 // TODO: gen the index anyway?
+2,
-17
1@@ -3,7 +3,6 @@ package prose
2 import (
3 "context"
4 "fmt"
5- "log/slog"
6 "os"
7 "os/signal"
8 "syscall"
9@@ -15,6 +14,7 @@ import (
10 "github.com/picosh/pico/db/postgres"
11 "github.com/picosh/pico/filehandlers"
12 uploadimgs "github.com/picosh/pico/filehandlers/imgs"
13+ fileshared "github.com/picosh/pico/filehandlers/shared"
14 "github.com/picosh/pico/shared"
15 "github.com/picosh/pico/shared/storage"
16 wsh "github.com/picosh/pico/wish"
17@@ -26,7 +26,6 @@ import (
18 "github.com/picosh/send/protocols/sftp"
19 "github.com/picosh/send/proxy"
20 "github.com/picosh/utils"
21- pipeUtil "github.com/picosh/utils/pipe"
22 )
23
24 func createRouter(handler *filehandlers.FileHandlerRouter, cliHandler *CliHandler) proxy.Router {
25@@ -55,20 +54,6 @@ func withProxy(handler *filehandlers.FileHandlerRouter, cliHandler *CliHandler,
26 }
27 }
28
29-func createPubProseDrain(ctx context.Context, logger *slog.Logger) *pipeUtil.ReconnectReadWriteCloser {
30- info := shared.NewPicoPipeClient()
31- send := pipeUtil.NewReconnectReadWriteCloser(
32- ctx,
33- logger,
34- info,
35- "pub to prose-drain",
36- "pub prose-drain -b=false",
37- 100,
38- -1,
39- )
40- return send
41-}
42-
43 func StartSshServer() {
44 host := utils.GetEnv("PROSE_HOST", "0.0.0.0")
45 port := utils.GetEnv("PROSE_SSH_PORT", "2222")
46@@ -80,7 +65,7 @@ func StartSshServer() {
47
48 ctx := context.Background()
49 defer ctx.Done()
50- pipeClient := createPubProseDrain(ctx, logger)
51+ pipeClient := fileshared.CreatePubUploadDrain(ctx, logger)
52
53 hooks := &MarkdownHooks{
54 Cfg: cfg,