Eric Bower
·
2026-03-08
cron.go
1package feeds
2
3import (
4 "crypto/tls"
5 "errors"
6 "fmt"
7 html "html/template"
8 "io"
9 "log/slog"
10 "math"
11 "net/http"
12 "net/url"
13 "os"
14 "strings"
15 "text/template"
16 "time"
17
18 "github.com/adhocore/gronx"
19 "github.com/emersion/go-sasl"
20 "github.com/emersion/go-smtp"
21 "github.com/mmcdole/gofeed"
22 "github.com/picosh/pico/pkg/db"
23 "github.com/picosh/pico/pkg/shared"
24)
25
26var ErrNoRecentArticles = errors.New("no recent articles")
27
28type UserAgentTransport struct {
29 http.RoundTripper
30}
31
32func (c *UserAgentTransport) RoundTrip(r *http.Request) (*http.Response, error) {
33 userAgent := "linux:feeds:v2 (by /u/pico-sh)"
34 r.Header.Set("User-Agent", userAgent)
35 r.Header.Set("Accept", "*/*")
36 return c.RoundTripper.RoundTrip(r)
37}
38
39var httpClient = http.Client{
40 Transport: &UserAgentTransport{
41 &http.Transport{
42 TLSClientConfig: &tls.Config{},
43 },
44 },
45}
46
47type FeedItemTmpl struct {
48 GUID string
49 Title string
50 Link string
51 PublishedAt *time.Time
52 Content html.HTML
53 Description html.HTML
54}
55
56type Feed struct {
57 Title string
58 Link string
59 Description string
60 Items []*FeedItemTmpl
61 FeedItems []*gofeed.Item
62}
63
64type DigestFeed struct {
65 Feeds []*Feed
66 Options DigestOptions
67 KeepAliveURL string
68 UnsubURL string
69 DaysLeft string
70 ShowBanner bool
71 SizeWarning bool
72 IsPicoPlus bool
73}
74
75type DigestOptions struct {
76 InlineContent bool
77}
78
79func itemToTemplate(item *gofeed.Item) *FeedItemTmpl {
80 return &FeedItemTmpl{
81 Title: item.Title,
82 Link: item.Link,
83 PublishedAt: item.PublishedParsed,
84 Description: html.HTML(item.Description),
85 Content: html.HTML(item.Content),
86 }
87}
88
89func DigestIntervalToCron(interval string) string {
90 switch interval {
91 case "10min":
92 return "*/10 * * * *"
93 case "1hour":
94 return "0 * * * *"
95 case "6hour":
96 return "0 */6 * * *"
97 case "12hour":
98 return "0 */12 * * *"
99 case "1day", "":
100 return "0 13 * * *"
101 case "7day":
102 return "0 13 * * 0"
103 case "30day":
104 return "0 13 1 * *"
105 default:
106 return "0 13 * * *"
107 }
108}
109
110func getFeedItemID(logger *slog.Logger, item *gofeed.Item) string {
111 guid := strings.ToValidUTF8(item.GUID, "")
112 if item.GUID == "" {
113 logger.Info("no <guid> found for feed item, using <link> instead for its unique id")
114 return strings.ToValidUTF8(item.Link, "")
115 }
116 return guid
117}
118
119// see if this feed item should be emailed to user.
120func isValidItem(logger *slog.Logger, item *gofeed.Item, feedItems []*db.FeedItem) bool {
121 for _, feedItem := range feedItems {
122 if getFeedItemID(logger, item) == feedItem.GUID {
123 return false
124 }
125 }
126
127 return true
128}
129
130type Fetcher struct {
131 cfg *shared.ConfigSite
132 db db.DB
133 auth sasl.Client
134 gron *gronx.Gronx
135 host string
136}
137
138func NewFetcher(dbpool db.DB, cfg *shared.ConfigSite) *Fetcher {
139 host := os.Getenv("PICO_SMTP_HOST")
140 smtPass := os.Getenv("PICO_SMTP_PASS")
141 emailLogin := os.Getenv("PICO_SMTP_USER")
142
143 auth := sasl.NewPlainClient("", emailLogin, smtPass)
144 gron := gronx.New()
145 return &Fetcher{
146 db: dbpool,
147 cfg: cfg,
148 auth: auth,
149 gron: gron,
150 host: host,
151 }
152}
153
154func DateToMin(now time.Time) time.Time {
155 return time.Date(
156 now.Year(), now.Month(), now.Day(),
157 now.Hour(), now.Minute(),
158 0, 0, // zero out second and nano-second for cron
159 now.Location(),
160 )
161}
162
163func (f *Fetcher) Validate(logger *slog.Logger, post *db.Post, parsed *shared.ListParsedText, now time.Time) error {
164 expiresAt := post.ExpiresAt
165 if expiresAt != nil {
166 if post.ExpiresAt.Before(now) {
167 return fmt.Errorf("(%s) post has expired, skipping", post.ExpiresAt.Format(time.RFC3339))
168 }
169 }
170
171 cron := parsed.Cron
172 // support for posts with deprecated `digest_interval` property
173 if parsed.DigestInterval != "" {
174 cron = DigestIntervalToCron(parsed.DigestInterval)
175 }
176 logger.Info("found cron", "cron", cron)
177
178 if !f.gron.IsValid(cron) {
179 return fmt.Errorf("(%s) is invalid `cron`, skipping", cron)
180 }
181
182 dt := DateToMin(now)
183 isDue, err := f.gron.IsDue(cron, dt)
184 if err != nil {
185 return fmt.Errorf("cron error, skipping; err: %w", err)
186 }
187 if !isDue {
188 nextTime, _ := gronx.NextTickAfter(cron, dt, true)
189 return fmt.Errorf(
190 "cron not time to digest, skipping; cur run: %s, next run: %s",
191 f.gron.C.GetRef(),
192 nextTime,
193 )
194 }
195 return nil
196}
197
198func (f *Fetcher) RunPost(logger *slog.Logger, user *db.User, isPicoPlus bool, post *db.Post, skipValidation bool, now time.Time) error {
199 logger = logger.With("filename", post.Filename)
200 logger.Info("running feed post")
201
202 parsed := shared.ListParseText(post.Text)
203
204 if parsed.Email == "" {
205 logger.Error("post does not have an email associated, removing post")
206 err := f.db.RemovePosts([]string{post.ID})
207 if err != nil {
208 return err
209 }
210 }
211
212 if post.Data.LastDigest != nil {
213 logger.Info("last digest", "at", post.Data.LastDigest.Format(time.RFC3339))
214 } else {
215 logger.Info("first time running post")
216 }
217 err := f.Validate(logger, post, parsed, now)
218 if err != nil {
219 logger.Info("validation failed", "err", err)
220 if skipValidation {
221 logger.Info("overriding validation error, continuing")
222 } else {
223 return nil
224 }
225 }
226 logger.Info("validation success")
227
228 urls := []string{}
229 for _, item := range parsed.Items {
230 rawUrl := ""
231 if item.IsText {
232 rawUrl = string(item.Value)
233 } else if item.IsURL {
234 rawUrl = string(item.URL)
235 }
236
237 if rawUrl == "" {
238 continue
239 }
240
241 _, err := url.Parse(rawUrl)
242 if err != nil {
243 logger.Info("invalid url", "url", rawUrl)
244 continue
245 }
246
247 logger.Info("found rss feed url", "url", rawUrl)
248 urls = append(urls, rawUrl)
249 }
250
251 if post.ExpiresAt == nil {
252 expiresAt := now.AddDate(0, 12, 0)
253 post.ExpiresAt = &expiresAt
254 }
255 _, err = f.db.UpdatePost(post)
256 if err != nil {
257 return err
258 }
259
260 subject := fmt.Sprintf("%s feed digest", post.Filename)
261 unsubURL := getUnsubURL(post)
262
263 msgBody, err := f.FetchAll(logger, urls, parsed.InlineContent, user.Name, isPicoPlus, post)
264 if err != nil {
265 errForUser := err
266
267 // we don't want to increment in this case
268 if errors.Is(errForUser, ErrNoRecentArticles) {
269 return nil
270 }
271
272 post.Data.Attempts += 1
273 logger.Error("could not fetch urls", "err", err, "attempts", post.Data.Attempts)
274
275 maxAttempts := 100
276 errBody := fmt.Sprintf(`There was an error attempting to fetch your feeds (%d) times. After (%d) attempts we remove the file from our system. Please check all the URLs and re-upload.
277Also, we have centralized logs in our pico.sh TUI that will display realtime feed errors so you can debug.
278
279
280%s
281
282
283%s`, post.Data.Attempts, maxAttempts, errForUser.Error(), post.Text)
284 err = f.SendEmail(
285 logger,
286 user.Name,
287 parsed.Email,
288 subject,
289 unsubURL,
290 &MsgBody{Html: strings.ReplaceAll(errBody, "\n", "<br />"), Text: errBody},
291 )
292 if err != nil {
293 return err
294 }
295
296 if post.Data.Attempts >= maxAttempts {
297 err = f.db.RemovePosts([]string{post.ID})
298 if err != nil {
299 return err
300 }
301 } else {
302 _, err = f.db.UpdatePost(post)
303 if err != nil {
304 return err
305 }
306 }
307 return errForUser
308 } else {
309 post.Data.Attempts = 0
310 _, err := f.db.UpdatePost(post)
311 if err != nil {
312 return err
313 }
314 }
315
316 if msgBody != nil {
317 err = f.SendEmail(logger, user.Name, parsed.Email, subject, unsubURL, msgBody)
318 if err != nil {
319 return err
320 }
321 }
322
323 post.Data.LastDigest = &now
324 _, err = f.db.UpdatePost(post)
325 if err != nil {
326 return err
327 }
328
329 return nil
330}
331
332func (f *Fetcher) RunUser(user *db.User, now time.Time) error {
333 logger := shared.LoggerWithUser(f.cfg.Logger, user)
334 logger.Info("run user")
335 posts, err := f.db.FindPostsByUser(&db.Pager{Num: 100}, user.ID, "feeds")
336 if err != nil {
337 return err
338 }
339 isPicoPlus := false
340 ff, _ := f.db.FindFeature(user.ID, "plus")
341 if ff != nil {
342 if ff.IsValid() {
343 isPicoPlus = true
344 }
345 }
346
347 if !isPicoPlus {
348 return fmt.Errorf("must be pico+ user to receive rss digests")
349 }
350
351 if len(posts.Data) > 0 {
352 logger.Info("found feed posts", "len", len(posts.Data))
353 }
354
355 for _, post := range posts.Data {
356 err = f.RunPost(logger, user, isPicoPlus, post, false, now)
357 if err != nil {
358 logger.Error("run post failed", "err", err)
359 }
360 }
361
362 return nil
363}
364
365func (f *Fetcher) ParseURL(fp *gofeed.Parser, url string) (*gofeed.Feed, error) {
366 req, err := http.NewRequest("GET", url, nil)
367 if err != nil {
368 return nil, err
369 }
370
371 resp, err := httpClient.Do(req)
372 if err != nil {
373 return nil, err
374 }
375
376 defer func() {
377 _ = resp.Body.Close()
378 }()
379 body, err := io.ReadAll(resp.Body)
380 if err != nil {
381 return nil, err
382 }
383
384 if resp.StatusCode < 200 || resp.StatusCode > 300 {
385 return nil, fmt.Errorf("fetching feed resulted in an error: %s %s", resp.Status, body)
386 }
387
388 feed, err := fp.ParseString(string(body))
389 if err != nil {
390 return nil, err
391 }
392
393 return feed, nil
394}
395
396func (f *Fetcher) Fetch(logger *slog.Logger, fp *gofeed.Parser, url string, username string, feedItems []*db.FeedItem) (*Feed, error) {
397 logger.Info("fetching feed", "url", url)
398
399 feed, err := f.ParseURL(fp, url)
400 if err != nil {
401 return nil, err
402 }
403
404 feedTmpl := &Feed{
405 Title: feed.Title,
406 Description: feed.Description,
407 Link: feed.Link,
408 }
409
410 items := []*FeedItemTmpl{}
411 gofeedItems := []*gofeed.Item{}
412 // we only want to return feed items published since the last digest time we fetched
413 for _, item := range feed.Items {
414 if item == nil {
415 continue
416 }
417
418 if !isValidItem(logger, item, feedItems) {
419 logger.Info("feed item already served", "guid", item.GUID)
420 continue
421 }
422
423 gofeedItems = append(gofeedItems, item)
424 items = append(items, itemToTemplate(item))
425 }
426
427 if len(items) == 0 {
428 return nil, fmt.Errorf(
429 "%s %w, skipping",
430 url,
431 ErrNoRecentArticles,
432 )
433 }
434
435 feedTmpl.FeedItems = gofeedItems
436 feedTmpl.Items = items
437 return feedTmpl, nil
438}
439
440func (f *Fetcher) PrintText(feedTmpl *DigestFeed) (string, error) {
441 ts, err := template.ParseFiles(
442 f.cfg.StaticPath("html/digest_text.page.tmpl"),
443 )
444
445 if err != nil {
446 return "", err
447 }
448
449 w := new(strings.Builder)
450 err = ts.Execute(w, feedTmpl)
451 if err != nil {
452 return "", err
453 }
454
455 return w.String(), nil
456}
457
458func (f *Fetcher) PrintHtml(feedTmpl *DigestFeed) (string, error) {
459 ts, err := html.ParseFiles(
460 f.cfg.StaticPath("html/digest.page.tmpl"),
461 )
462
463 if err != nil {
464 return "", err
465 }
466
467 w := new(strings.Builder)
468 err = ts.Execute(w, feedTmpl)
469 if err != nil {
470 return "", err
471 }
472
473 return w.String(), nil
474}
475
476type MsgBody struct {
477 Html string
478 Text string
479}
480
481func getUnsubURL(post *db.Post) string {
482 return fmt.Sprintf("https://feeds.pico.sh/unsub/%s", post.ID)
483}
484
485func (f *Fetcher) FetchAll(logger *slog.Logger, urls []string, inlineContent bool, username string, isPicoPlus bool, post *db.Post) (*MsgBody, error) {
486 logger.Info("fetching feeds", "inlineContent", inlineContent)
487 fp := gofeed.NewParser()
488 daysLeft := ""
489 showBanner := false
490 if post.ExpiresAt != nil {
491 diff := time.Until(*post.ExpiresAt)
492 daysLeftInt := int(math.Ceil(diff.Hours() / 24))
493 daysLeft = fmt.Sprintf("%d", daysLeftInt)
494 if daysLeftInt <= 30 {
495 showBanner = true
496 }
497 }
498 feeds := &DigestFeed{
499 KeepAliveURL: fmt.Sprintf("https://feeds.pico.sh/keep-alive/%s", post.ID),
500 UnsubURL: getUnsubURL(post),
501 DaysLeft: daysLeft,
502 ShowBanner: showBanner,
503 Options: DigestOptions{InlineContent: inlineContent},
504 IsPicoPlus: isPicoPlus,
505 }
506 feedItems, err := f.db.FindFeedItemsByPostID(post.ID)
507 if err != nil {
508 return nil, err
509 }
510
511 if len(urls) == 0 {
512 return nil, fmt.Errorf("feed file does not contain any urls")
513 }
514
515 var allErrors error
516 for _, url := range urls {
517 feedTmpl, err := f.Fetch(logger, fp, url, username, feedItems)
518 if err != nil {
519 if errors.Is(err, ErrNoRecentArticles) {
520 logger.Info("no recent articles", "err", err)
521 } else {
522 allErrors = errors.Join(allErrors, fmt.Errorf("%s: %w", url, err))
523 logger.Error("fetch error", "err", err)
524 }
525 continue
526 }
527 feeds.Feeds = append(feeds.Feeds, feedTmpl)
528 }
529
530 if len(feeds.Feeds) == 0 {
531 if allErrors != nil {
532 return nil, allErrors
533 }
534 return nil, fmt.Errorf("%w, skipping email", ErrNoRecentArticles)
535 }
536
537 fdi := []*db.FeedItem{}
538 for _, feed := range feeds.Feeds {
539 for _, item := range feed.FeedItems {
540 uid := getFeedItemID(logger, item)
541 fdi = append(fdi, &db.FeedItem{
542 PostID: post.ID,
543 GUID: uid,
544 Data: db.FeedItemData{
545 Title: strings.ToValidUTF8(item.Title, ""),
546 Description: strings.ToValidUTF8(item.Description, ""),
547 Content: strings.ToValidUTF8(item.Content, ""),
548 Link: item.Link,
549 PublishedAt: item.PublishedParsed,
550 },
551 })
552 }
553 }
554 err = f.db.InsertFeedItems(post.ID, fdi)
555 if err != nil {
556 return nil, err
557 }
558
559 text, err := f.PrintText(feeds)
560 if err != nil {
561 return nil, err
562 }
563
564 html, err := f.PrintHtml(feeds)
565 if err != nil {
566 return nil, err
567 }
568
569 // cap body size to prevent abuse
570 if len(html)+len(text) > 5*shared.MB {
571 feeds.Options.InlineContent = false
572 feeds.SizeWarning = true
573 html, err = f.PrintHtml(feeds)
574 if err != nil {
575 return nil, err
576 }
577 }
578
579 if allErrors != nil {
580 text = fmt.Sprintf("> %s\n\n%s", allErrors, text)
581 html = fmt.Sprintf("<blockquote>%s</blockquote><br /><br/>%s", allErrors, html)
582 }
583
584 return &MsgBody{
585 Text: text,
586 Html: html,
587 }, nil
588}
589
590func (f *Fetcher) SendEmail(logger *slog.Logger, username, email, subject, unsubURL string, msg *MsgBody) error {
591 if email == "" {
592 return fmt.Errorf("(%s) does not have an email associated with their feed post", username)
593 }
594 smtpAddr := f.host
595 fromEmail := "hello@pico.sh"
596 to := []string{email}
597 headers := map[string]string{
598 "From": fromEmail,
599 "Subject": subject,
600 "To": email,
601 "MIME-Version": "1.0",
602 "Content-Type": `multipart/alternative; boundary="boundary123"`,
603 "List-Unsubscribe": "<" + unsubURL + ">",
604 }
605 var content strings.Builder
606 for k, v := range headers {
607 fmt.Fprintf(&content, "%s: %s\r\n", k, v)
608 }
609 content.WriteString("\r\n")
610 content.WriteString("\r\n--boundary123\r\n")
611 content.WriteString("Content-Type: text/plain; charset=\"utf-8\"\r\n")
612 content.WriteString("\r\n" + msg.Text + "\r\n")
613 content.WriteString("--boundary123\r\n")
614 content.WriteString("Content-Type: text/html; charset=\"utf-8\"\r\n")
615 content.WriteString("\r\n" + msg.Html + "\r\n")
616 content.WriteString("--boundary123--")
617
618 reader := strings.NewReader(content.String())
619 logger.Info("sending email digest")
620 err := smtp.SendMail(
621 smtpAddr,
622 f.auth,
623 fromEmail,
624 to,
625 reader,
626 )
627 return err
628}
629
630func (f *Fetcher) Run(now time.Time) error {
631 users, err := f.db.FindUsersWithPost("feeds")
632 if err != nil {
633 return err
634 }
635
636 for _, user := range users {
637 err := f.RunUser(user, now)
638 if err != nil {
639 f.cfg.Logger.Error("run user failed", "err", err)
640 continue
641 }
642 }
643
644 return nil
645}
646
647func (f *Fetcher) Loop() {
648 logger := f.cfg.Logger
649 for {
650 logger.Info("running digest emailer")
651
652 err := f.Run(time.Now().UTC())
653 if err != nil {
654 logger.Error("run failed", "err", err)
655 }
656
657 logger.Info("digest emailer finished, waiting 1min ...")
658 time.Sleep(1 * time.Minute)
659 }
660}