repos / pico

pico services mono repo
git clone https://github.com/picosh/pico.git

commit
e1fca8b
parent
4ef30c4
author
Eric Bower
date
2025-08-07 22:21:35 -0400 EDT
feat(feeds): a wild `cron` property appears

Now users can specify a digest interval for their rss-to-email posts
using a cron format.

The primary limitation is we run our feed processor every minute so we
don't support the seconds specificity in cron.

Reference: https://github.com/adhocore/gronx?tab=readme-ov-file#cron-expression
6 files changed,  +66, -22
M go.mod
M go.sum
M go.mod
+1, -0
1@@ -23,6 +23,7 @@ toolchain go1.24.0
2 require (
3 	git.sr.ht/~delthas/senpai v0.4.0
4 	git.sr.ht/~rockorager/vaxis v0.14.1-0.20250527151737-5530f9f4bcf6
5+	github.com/adhocore/gronx v1.19.6
6 	github.com/alecthomas/chroma/v2 v2.15.0
7 	github.com/antoniomika/syncmap v1.0.0
8 	github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de
M go.sum
+2, -0
1@@ -54,6 +54,8 @@ github.com/PuerkitoBio/goquery v1.10.2/go.mod h1:0guWGjcLu9AYC7C1GHnpysHy056u9aE
2 github.com/RoaringBitmap/roaring v1.2.1/go.mod h1:icnadbWcNyfEHlYdr+tDlOTih1Bf/h+rzPpv4sbomAA=
3 github.com/RoaringBitmap/roaring v1.9.4 h1:yhEIoH4YezLYT04s1nHehNO64EKFTop/wBhxv2QzDdQ=
4 github.com/RoaringBitmap/roaring v1.9.4/go.mod h1:6AXUsoIEzDTFFQCe1RbGA6uFONMhvejWj5rqITANK90=
5+github.com/adhocore/gronx v1.19.6 h1:5KNVcoR9ACgL9HhEqCm5QXsab/gI4QDIybTAWcXDKDc=
6+github.com/adhocore/gronx v1.19.6/go.mod h1:7oUY1WAU8rEJWmAxXR2DN0JaO4gi9khSgKjiRypqteg=
7 github.com/alecthomas/assert/v2 v2.11.0 h1:2Q9r3ki8+JYXvGsDyBXwH3LcJ+WK5D0gc5E8vS6K3D0=
8 github.com/alecthomas/assert/v2 v2.11.0/go.mod h1:Bze95FyfUr7x34QZrjL+XP+0qgp/zg8yS+TtBj1WA3k=
9 github.com/alecthomas/chroma/v2 v2.2.0/go.mod h1:vf4zrexSH54oEjJ7EdB65tGNHmH3pGZmVkgTP5RHvAs=
M pkg/apps/feeds/cli.go
+10, -3
 1@@ -70,13 +70,20 @@ func Middleware(dbpool db.DB, cfg *shared.ConfigSite) pssh.SSHServerMiddleware {
 2 				_, _ = fmt.Fprintln(writer, "Filename\tLast Digest\tNext Digest\tInterval\tFailed Attempts")
 3 				for _, post := range posts.Data {
 4 					parsed := shared.ListParseText(post.Text)
 5-					digestOption := DigestOptionToTime(*post.Data.LastDigest, parsed.DigestInterval)
 6+
 7+					nextDigest := ""
 8+					if parsed.Cron != "" {
 9+						nextDigest = parsed.Cron
10+					} else {
11+						digestOption := DigestOptionToTime(*post.Data.LastDigest, parsed.DigestInterval)
12+						nextDigest = digestOption.Format(time.RFC3339)
13+					}
14 					_, _ = fmt.Fprintf(
15 						writer,
16 						"%s\t%s\t%s\t%s\t%d/10\r\n",
17 						post.Filename,
18 						post.Data.LastDigest.Format(time.RFC3339),
19-						digestOption.Format(time.RFC3339),
20+						nextDigest,
21 						parsed.DigestInterval,
22 						post.Data.Attempts,
23 					)
24@@ -123,7 +130,7 @@ func Middleware(dbpool db.DB, cfg *shared.ConfigSite) pssh.SSHServerMiddleware {
25 				}
26 				_, _ = fmt.Fprintf(sesh, "running feed post: %s\r\n", filename)
27 				fetcher := NewFetcher(dbpool, cfg)
28-				err = fetcher.RunPost(logger, user, post, true)
29+				err = fetcher.RunPost(logger, user, post, true, time.Now().UTC())
30 				if err != nil {
31 					_, _ = fmt.Fprintln(sesh.Stderr(), err)
32 				}
M pkg/apps/feeds/cron.go
+42, -18
  1@@ -15,6 +15,7 @@ import (
  2 	"text/template"
  3 	"time"
  4 
  5+	"github.com/adhocore/gronx"
  6 	"github.com/emersion/go-sasl"
  7 	"github.com/emersion/go-smtp"
  8 	"github.com/mmcdole/gofeed"
  9@@ -129,26 +130,34 @@ type Fetcher struct {
 10 	cfg  *shared.ConfigSite
 11 	db   db.DB
 12 	auth sasl.Client
 13+	gron *gronx.Gronx
 14 }
 15 
 16 func NewFetcher(dbpool db.DB, cfg *shared.ConfigSite) *Fetcher {
 17 	smtPass := os.Getenv("PICO_SMTP_PASS")
 18 	emailLogin := os.Getenv("PICO_SMTP_USER")
 19 	auth := sasl.NewPlainClient("", emailLogin, smtPass)
 20+	gron := gronx.New()
 21 	return &Fetcher{
 22 		db:   dbpool,
 23 		cfg:  cfg,
 24 		auth: auth,
 25+		gron: gron,
 26 	}
 27 }
 28 
 29-func (f *Fetcher) Validate(post *db.Post, parsed *shared.ListParsedText) error {
 30+func (f *Fetcher) Validate(post *db.Post, parsed *shared.ListParsedText, now time.Time) error {
 31 	lastDigest := post.Data.LastDigest
 32 	if lastDigest == nil {
 33 		return nil
 34 	}
 35 
 36-	now := time.Now().UTC()
 37+	toTheMin := time.Date(
 38+		now.Year(), now.Month(), now.Day(),
 39+		now.Hour(), now.Minute(),
 40+		0, 0, // zero out second and nano-second for cron
 41+		now.Location(),
 42+	)
 43 
 44 	expiresAt := post.ExpiresAt
 45 	if expiresAt != nil {
 46@@ -157,14 +166,29 @@ func (f *Fetcher) Validate(post *db.Post, parsed *shared.ListParsedText) error {
 47 		}
 48 	}
 49 
 50-	digestAt := DigestOptionToTime(*lastDigest, parsed.DigestInterval)
 51-	if digestAt.After(now) {
 52-		return fmt.Errorf("(%s) not time to digest, skipping", digestAt.Format(time.RFC3339))
 53+	if parsed.Cron != "" {
 54+		isDue, err := f.gron.IsDue(parsed.Cron, toTheMin)
 55+		if err != nil {
 56+			return fmt.Errorf("cron error, skipping; err: %w", err)
 57+		}
 58+		if !isDue {
 59+			nextTime, _ := gronx.NextTick(parsed.Cron, true)
 60+			return fmt.Errorf(
 61+				"cron not time to digest, skipping; cur run: %s, next run: %s",
 62+				f.gron.C.GetRef(),
 63+				nextTime,
 64+			)
 65+		}
 66+	} else if parsed.DigestInterval != "" {
 67+		digestAt := DigestOptionToTime(*lastDigest, parsed.DigestInterval)
 68+		if digestAt.After(now) {
 69+			return fmt.Errorf("(%s) not time to digest, skipping", digestAt.Format(time.RFC3339))
 70+		}
 71 	}
 72 	return nil
 73 }
 74 
 75-func (f *Fetcher) RunPost(logger *slog.Logger, user *db.User, post *db.Post, skipValidation bool) error {
 76+func (f *Fetcher) RunPost(logger *slog.Logger, user *db.User, post *db.Post, skipValidation bool, now time.Time) error {
 77 	logger = logger.With("filename", post.Filename)
 78 	logger.Info("running feed post")
 79 
 80@@ -178,8 +202,8 @@ func (f *Fetcher) RunPost(logger *slog.Logger, user *db.User, post *db.Post, ski
 81 		}
 82 	}
 83 
 84-	logger.Info("last digest at", "lastDigest", post.Data.LastDigest.Format(time.RFC3339))
 85-	err := f.Validate(post, parsed)
 86+	logger.Info("last digest", "timestamp", post.Data.LastDigest.Format(time.RFC3339))
 87+	err := f.Validate(post, parsed, now)
 88 	if err != nil {
 89 		logger.Info("validation failed", "err", err)
 90 		if skipValidation {
 91@@ -212,9 +236,8 @@ func (f *Fetcher) RunPost(logger *slog.Logger, user *db.User, post *db.Post, ski
 92 		urls = append(urls, u)
 93 	}
 94 
 95-	now := time.Now().UTC()
 96 	if post.ExpiresAt == nil {
 97-		expiresAt := time.Now().AddDate(0, 12, 0)
 98+		expiresAt := now.AddDate(0, 12, 0)
 99 		post.ExpiresAt = &expiresAt
100 	}
101 	_, err = f.db.UpdatePost(post)
102@@ -291,8 +314,9 @@ Also, we have centralized logs in our pico.sh TUI that will display realtime fee
103 	return nil
104 }
105 
106-func (f *Fetcher) RunUser(user *db.User) error {
107+func (f *Fetcher) RunUser(user *db.User, now time.Time) error {
108 	logger := shared.LoggerWithUser(f.cfg.Logger, user)
109+	logger.Info("run user")
110 	posts, err := f.db.FindPostsForUser(&db.Pager{Num: 100}, user.ID, "feeds")
111 	if err != nil {
112 		return err
113@@ -303,7 +327,7 @@ func (f *Fetcher) RunUser(user *db.User) error {
114 	}
115 
116 	for _, post := range posts.Data {
117-		err = f.RunPost(logger, user, post, false)
118+		err = f.RunPost(logger, user, post, false, now)
119 		if err != nil {
120 			logger.Error("run post failed", "err", err)
121 		}
122@@ -561,16 +585,16 @@ func (f *Fetcher) SendEmail(logger *slog.Logger, username, email, subject string
123 	return err
124 }
125 
126-func (f *Fetcher) Run(logger *slog.Logger) error {
127+func (f *Fetcher) Run(now time.Time) error {
128 	users, err := f.db.FindUsers()
129 	if err != nil {
130 		return err
131 	}
132 
133 	for _, user := range users {
134-		err := f.RunUser(user)
135+		err := f.RunUser(user, now)
136 		if err != nil {
137-			logger.Error("run user failed", "err", err)
138+			f.cfg.Logger.Error("run user failed", "err", err)
139 			continue
140 		}
141 	}
142@@ -583,12 +607,12 @@ func (f *Fetcher) Loop() {
143 	for {
144 		logger.Info("running digest emailer")
145 
146-		err := f.Run(logger)
147+		err := f.Run(time.Now().UTC())
148 		if err != nil {
149 			logger.Error("run failed", "err", err)
150 		}
151 
152-		logger.Info("digest emailer finished, waiting 10 mins")
153-		time.Sleep(10 * time.Minute)
154+		logger.Info("digest emailer finished, waiting 1min ...")
155+		time.Sleep(1 * time.Minute)
156 	}
157 }
M pkg/db/postgres/storage.go
+4, -1
 1@@ -1561,7 +1561,10 @@ func (me *PsqlDB) InsertFeedItems(postID string, items []*db.FeedItem) error {
 2 			item.Data,
 3 		)
 4 		if err != nil {
 5-			return fmt.Errorf("post id:%s, guid:%s, err:%w", item.PostID, item.GUID, err)
 6+			return fmt.Errorf(
 7+				"post id:%s, link:%s, guid:%s, err:%w",
 8+				item.PostID, item.Data.Link, item.GUID, err,
 9+			)
10 		}
11 	}
12 
M pkg/shared/listparser.go
+7, -0
 1@@ -11,6 +11,7 @@ import (
 2 
 3 	"slices"
 4 
 5+	"github.com/adhocore/gronx"
 6 	"github.com/araddon/dateparse"
 7 )
 8 
 9@@ -52,6 +53,7 @@ type ListMetaData struct {
10 	Tags           []string
11 	ListType       string // https://developer.mozilla.org/en-US/docs/Web/CSS/list-style-type
12 	DigestInterval string
13+	Cron           string
14 	Email          string
15 	InlineContent  bool // allows content inlining to be disabled in feeds.pico.sh emails
16 }
17@@ -130,6 +132,11 @@ func TokenToMetaField(meta *ListMetaData, token *SplitToken) error {
18 			)
19 		}
20 		meta.DigestInterval = token.Value
21+	case "cron":
22+		if !gronx.IsValid(token.Value) {
23+			return fmt.Errorf("(%s) is not in a valid cron format: https://github.com/adhocore/gronx?tab=readme-ov-file#cron-expression", token.Value)
24+		}
25+		meta.Cron = token.Value
26 	case "email":
27 		meta.Email = token.Value
28 	case "inline_content":