Eric Bower
·
2026-01-25
cron.go
1package feeds
2
3import (
4 "crypto/tls"
5 "errors"
6 "fmt"
7 html "html/template"
8 "io"
9 "log/slog"
10 "math"
11 "net/http"
12 "net/url"
13 "os"
14 "strings"
15 "text/template"
16 "time"
17
18 "github.com/adhocore/gronx"
19 "github.com/emersion/go-sasl"
20 "github.com/emersion/go-smtp"
21 "github.com/mmcdole/gofeed"
22 "github.com/picosh/pico/pkg/db"
23 "github.com/picosh/pico/pkg/shared"
24)
25
26var ErrNoRecentArticles = errors.New("no recent articles")
27
28type UserAgentTransport struct {
29 http.RoundTripper
30}
31
32func (c *UserAgentTransport) RoundTrip(r *http.Request) (*http.Response, error) {
33 userAgent := "linux:feeds:v2 (by /u/pico-sh)"
34 r.Header.Set("User-Agent", userAgent)
35 r.Header.Set("Accept", "*/*")
36 return c.RoundTripper.RoundTrip(r)
37}
38
39var httpClient = http.Client{
40 Transport: &UserAgentTransport{
41 &http.Transport{
42 TLSClientConfig: &tls.Config{},
43 },
44 },
45}
46
47type FeedItemTmpl struct {
48 GUID string
49 Title string
50 Link string
51 PublishedAt *time.Time
52 Content html.HTML
53 Description html.HTML
54}
55
56type Feed struct {
57 Title string
58 Link string
59 Description string
60 Items []*FeedItemTmpl
61 FeedItems []*gofeed.Item
62}
63
64type DigestFeed struct {
65 Feeds []*Feed
66 Options DigestOptions
67 KeepAliveURL string
68 UnsubURL string
69 DaysLeft string
70 ShowBanner bool
71 SizeWarning bool
72 IsPicoPlus bool
73}
74
75type DigestOptions struct {
76 InlineContent bool
77}
78
79func itemToTemplate(item *gofeed.Item) *FeedItemTmpl {
80 return &FeedItemTmpl{
81 Title: item.Title,
82 Link: item.Link,
83 PublishedAt: item.PublishedParsed,
84 Description: html.HTML(item.Description),
85 Content: html.HTML(item.Content),
86 }
87}
88
89func DigestIntervalToCron(interval string) string {
90 switch interval {
91 case "10min":
92 return "*/10 * * * *"
93 case "1hour":
94 return "0 * * * *"
95 case "6hour":
96 return "0 */6 * * *"
97 case "12hour":
98 return "0 */12 * * *"
99 case "1day", "":
100 return "0 13 * * *"
101 case "7day":
102 return "0 13 * * 0"
103 case "30day":
104 return "0 13 1 * *"
105 default:
106 return "0 13 * * *"
107 }
108}
109
110func getFeedItemID(logger *slog.Logger, item *gofeed.Item) string {
111 guid := strings.ToValidUTF8(item.GUID, "")
112 if item.GUID == "" {
113 logger.Info("no <guid> found for feed item, using <link> instead for its unique id")
114 return strings.ToValidUTF8(item.Link, "")
115 }
116 return guid
117}
118
119// see if this feed item should be emailed to user.
120func isValidItem(logger *slog.Logger, item *gofeed.Item, feedItems []*db.FeedItem) bool {
121 for _, feedItem := range feedItems {
122 if getFeedItemID(logger, item) == feedItem.GUID {
123 return false
124 }
125 }
126
127 return true
128}
129
130type Fetcher struct {
131 cfg *shared.ConfigSite
132 db db.DB
133 auth sasl.Client
134 gron *gronx.Gronx
135 host string
136}
137
138func NewFetcher(dbpool db.DB, cfg *shared.ConfigSite) *Fetcher {
139 host := os.Getenv("PICO_SMTP_HOST")
140 smtPass := os.Getenv("PICO_SMTP_PASS")
141 emailLogin := os.Getenv("PICO_SMTP_USER")
142
143 auth := sasl.NewPlainClient("", emailLogin, smtPass)
144 gron := gronx.New()
145 return &Fetcher{
146 db: dbpool,
147 cfg: cfg,
148 auth: auth,
149 gron: gron,
150 host: host,
151 }
152}
153
154func DateToMin(now time.Time) time.Time {
155 return time.Date(
156 now.Year(), now.Month(), now.Day(),
157 now.Hour(), now.Minute(),
158 0, 0, // zero out second and nano-second for cron
159 now.Location(),
160 )
161}
162
163func (f *Fetcher) Validate(logger *slog.Logger, post *db.Post, parsed *shared.ListParsedText, now time.Time) error {
164 expiresAt := post.ExpiresAt
165 if expiresAt != nil {
166 if post.ExpiresAt.Before(now) {
167 return fmt.Errorf("(%s) post has expired, skipping", post.ExpiresAt.Format(time.RFC3339))
168 }
169 }
170
171 cron := parsed.Cron
172 // support for posts with deprecated `digest_interval` property
173 if parsed.DigestInterval != "" {
174 cron = DigestIntervalToCron(parsed.DigestInterval)
175 }
176 logger.Info("found cron", "cron", cron)
177
178 if !f.gron.IsValid(cron) {
179 return fmt.Errorf("(%s) is invalid `cron`, skipping", cron)
180 }
181
182 dt := DateToMin(now)
183 isDue, err := f.gron.IsDue(cron, dt)
184 if err != nil {
185 return fmt.Errorf("cron error, skipping; err: %w", err)
186 }
187 if !isDue {
188 nextTime, _ := gronx.NextTickAfter(cron, dt, true)
189 return fmt.Errorf(
190 "cron not time to digest, skipping; cur run: %s, next run: %s",
191 f.gron.C.GetRef(),
192 nextTime,
193 )
194 }
195 return nil
196}
197
198func (f *Fetcher) RunPost(logger *slog.Logger, user *db.User, isPicoPlus bool, post *db.Post, skipValidation bool, now time.Time) error {
199 logger = logger.With("filename", post.Filename)
200 logger.Info("running feed post")
201
202 parsed := shared.ListParseText(post.Text)
203
204 if parsed.Email == "" {
205 logger.Error("post does not have an email associated, removing post")
206 err := f.db.RemovePosts([]string{post.ID})
207 if err != nil {
208 return err
209 }
210 }
211
212 if post.Data.LastDigest != nil {
213 logger.Info("last digest", "at", post.Data.LastDigest.Format(time.RFC3339))
214 } else {
215 logger.Info("first time running post")
216 }
217 err := f.Validate(logger, post, parsed, now)
218 if err != nil {
219 logger.Info("validation failed", "err", err)
220 if skipValidation {
221 logger.Info("overriding validation error, continuing")
222 } else {
223 return nil
224 }
225 }
226 logger.Info("validation success")
227
228 urls := []string{}
229 for _, item := range parsed.Items {
230 u := ""
231 if item.IsText || item.IsURL {
232 u = item.Value
233 } else if item.IsURL {
234 u = string(item.Value)
235 }
236
237 if u == "" {
238 continue
239 }
240
241 _, err := url.Parse(string(item.URL))
242 if err != nil {
243 logger.Info("invalid url", "url", string(item.URL))
244 continue
245 }
246
247 logger.Info("found rss feed url", "url", u)
248 urls = append(urls, u)
249 }
250
251 if post.ExpiresAt == nil {
252 expiresAt := now.AddDate(0, 12, 0)
253 post.ExpiresAt = &expiresAt
254 }
255 _, err = f.db.UpdatePost(post)
256 if err != nil {
257 return err
258 }
259
260 subject := fmt.Sprintf("%s feed digest", post.Filename)
261 unsubURL := getUnsubURL(post)
262
263 msgBody, err := f.FetchAll(logger, urls, parsed.InlineContent, user.Name, isPicoPlus, post)
264 if err != nil {
265 errForUser := err
266
267 // we don't want to increment in this case
268 if errors.Is(errForUser, ErrNoRecentArticles) {
269 return nil
270 }
271
272 post.Data.Attempts += 1
273 logger.Error("could not fetch urls", "err", err, "attempts", post.Data.Attempts)
274
275 maxAttempts := 10
276 errBody := fmt.Sprintf(`There was an error attempting to fetch your feeds (%d) times. After (%d) attempts we remove the file from our system. Please check all the URLs and re-upload.
277Also, we have centralized logs in our pico.sh TUI that will display realtime feed errors so you can debug.
278
279
280%s
281
282
283%s`, post.Data.Attempts, maxAttempts, errForUser.Error(), post.Text)
284 err = f.SendEmail(
285 logger,
286 user.Name,
287 parsed.Email,
288 subject,
289 unsubURL,
290 &MsgBody{Html: strings.ReplaceAll(errBody, "\n", "<br />"), Text: errBody},
291 )
292 if err != nil {
293 return err
294 }
295
296 if post.Data.Attempts >= maxAttempts {
297 err = f.db.RemovePosts([]string{post.ID})
298 if err != nil {
299 return err
300 }
301 } else {
302 _, err = f.db.UpdatePost(post)
303 if err != nil {
304 return err
305 }
306 }
307 return errForUser
308 } else {
309 post.Data.Attempts = 0
310 _, err := f.db.UpdatePost(post)
311 if err != nil {
312 return err
313 }
314 }
315
316 if msgBody != nil {
317 err = f.SendEmail(logger, user.Name, parsed.Email, subject, unsubURL, msgBody)
318 if err != nil {
319 return err
320 }
321 }
322
323 post.Data.LastDigest = &now
324 _, err = f.db.UpdatePost(post)
325 if err != nil {
326 return err
327 }
328
329 return nil
330}
331
332func (f *Fetcher) RunUser(user *db.User, now time.Time) error {
333 logger := shared.LoggerWithUser(f.cfg.Logger, user)
334 logger.Info("run user")
335 posts, err := f.db.FindPostsByUser(&db.Pager{Num: 100}, user.ID, "feeds")
336 if err != nil {
337 return err
338 }
339 isPicoPlus := false
340 ff, _ := f.db.FindFeature(user.ID, "plus")
341 if ff != nil {
342 if ff.IsValid() {
343 isPicoPlus = true
344 }
345 }
346
347 if len(posts.Data) > 0 {
348 logger.Info("found feed posts", "len", len(posts.Data))
349 }
350
351 for _, post := range posts.Data {
352 err = f.RunPost(logger, user, isPicoPlus, post, false, now)
353 if err != nil {
354 logger.Error("run post failed", "err", err)
355 }
356 }
357
358 return nil
359}
360
361func (f *Fetcher) ParseURL(fp *gofeed.Parser, url string) (*gofeed.Feed, error) {
362 req, err := http.NewRequest("GET", url, nil)
363 if err != nil {
364 return nil, err
365 }
366
367 resp, err := httpClient.Do(req)
368 if err != nil {
369 return nil, err
370 }
371
372 defer func() {
373 _ = resp.Body.Close()
374 }()
375 body, err := io.ReadAll(resp.Body)
376 if err != nil {
377 return nil, err
378 }
379
380 if resp.StatusCode < 200 || resp.StatusCode > 300 {
381 return nil, fmt.Errorf("fetching feed resulted in an error: %s %s", resp.Status, body)
382 }
383
384 feed, err := fp.ParseString(string(body))
385 if err != nil {
386 return nil, err
387 }
388
389 return feed, nil
390}
391
392func (f *Fetcher) Fetch(logger *slog.Logger, fp *gofeed.Parser, url string, username string, feedItems []*db.FeedItem) (*Feed, error) {
393 logger.Info("fetching feed", "url", url)
394
395 feed, err := f.ParseURL(fp, url)
396 if err != nil {
397 return nil, err
398 }
399
400 feedTmpl := &Feed{
401 Title: feed.Title,
402 Description: feed.Description,
403 Link: feed.Link,
404 }
405
406 items := []*FeedItemTmpl{}
407 gofeedItems := []*gofeed.Item{}
408 // we only want to return feed items published since the last digest time we fetched
409 for _, item := range feed.Items {
410 if item == nil {
411 continue
412 }
413
414 if !isValidItem(logger, item, feedItems) {
415 logger.Info("feed item already served", "guid", item.GUID)
416 continue
417 }
418
419 gofeedItems = append(gofeedItems, item)
420 items = append(items, itemToTemplate(item))
421 }
422
423 if len(items) == 0 {
424 return nil, fmt.Errorf(
425 "%s %w, skipping",
426 url,
427 ErrNoRecentArticles,
428 )
429 }
430
431 feedTmpl.FeedItems = gofeedItems
432 feedTmpl.Items = items
433 return feedTmpl, nil
434}
435
436func (f *Fetcher) PrintText(feedTmpl *DigestFeed) (string, error) {
437 ts, err := template.ParseFiles(
438 f.cfg.StaticPath("html/digest_text.page.tmpl"),
439 )
440
441 if err != nil {
442 return "", err
443 }
444
445 w := new(strings.Builder)
446 err = ts.Execute(w, feedTmpl)
447 if err != nil {
448 return "", err
449 }
450
451 return w.String(), nil
452}
453
454func (f *Fetcher) PrintHtml(feedTmpl *DigestFeed) (string, error) {
455 ts, err := html.ParseFiles(
456 f.cfg.StaticPath("html/digest.page.tmpl"),
457 )
458
459 if err != nil {
460 return "", err
461 }
462
463 w := new(strings.Builder)
464 err = ts.Execute(w, feedTmpl)
465 if err != nil {
466 return "", err
467 }
468
469 return w.String(), nil
470}
471
472type MsgBody struct {
473 Html string
474 Text string
475}
476
477func getUnsubURL(post *db.Post) string {
478 return fmt.Sprintf("https://feeds.pico.sh/unsub/%s", post.ID)
479}
480
481func (f *Fetcher) FetchAll(logger *slog.Logger, urls []string, inlineContent bool, username string, isPicoPlus bool, post *db.Post) (*MsgBody, error) {
482 logger.Info("fetching feeds", "inlineContent", inlineContent)
483 fp := gofeed.NewParser()
484 daysLeft := ""
485 showBanner := false
486 if post.ExpiresAt != nil {
487 diff := time.Until(*post.ExpiresAt)
488 daysLeftInt := int(math.Ceil(diff.Hours() / 24))
489 daysLeft = fmt.Sprintf("%d", daysLeftInt)
490 if daysLeftInt <= 30 {
491 showBanner = true
492 }
493 }
494 feeds := &DigestFeed{
495 KeepAliveURL: fmt.Sprintf("https://feeds.pico.sh/keep-alive/%s", post.ID),
496 UnsubURL: getUnsubURL(post),
497 DaysLeft: daysLeft,
498 ShowBanner: showBanner,
499 Options: DigestOptions{InlineContent: inlineContent},
500 IsPicoPlus: isPicoPlus,
501 }
502 feedItems, err := f.db.FindFeedItemsByPostID(post.ID)
503 if err != nil {
504 return nil, err
505 }
506
507 if len(urls) == 0 {
508 return nil, fmt.Errorf("feed file does not contain any urls")
509 }
510
511 var allErrors error
512 for _, url := range urls {
513 feedTmpl, err := f.Fetch(logger, fp, url, username, feedItems)
514 if err != nil {
515 if errors.Is(err, ErrNoRecentArticles) {
516 logger.Info("no recent articles", "err", err)
517 } else {
518 allErrors = errors.Join(allErrors, fmt.Errorf("%s: %w", url, err))
519 logger.Error("fetch error", "err", err)
520 }
521 continue
522 }
523 feeds.Feeds = append(feeds.Feeds, feedTmpl)
524 }
525
526 if len(feeds.Feeds) == 0 {
527 if allErrors != nil {
528 return nil, allErrors
529 }
530 return nil, fmt.Errorf("%w, skipping email", ErrNoRecentArticles)
531 }
532
533 fdi := []*db.FeedItem{}
534 for _, feed := range feeds.Feeds {
535 for _, item := range feed.FeedItems {
536 uid := getFeedItemID(logger, item)
537 fdi = append(fdi, &db.FeedItem{
538 PostID: post.ID,
539 GUID: uid,
540 Data: db.FeedItemData{
541 Title: strings.ToValidUTF8(item.Title, ""),
542 Description: strings.ToValidUTF8(item.Description, ""),
543 Content: strings.ToValidUTF8(item.Content, ""),
544 Link: item.Link,
545 PublishedAt: item.PublishedParsed,
546 },
547 })
548 }
549 }
550 err = f.db.InsertFeedItems(post.ID, fdi)
551 if err != nil {
552 return nil, err
553 }
554
555 text, err := f.PrintText(feeds)
556 if err != nil {
557 return nil, err
558 }
559
560 html, err := f.PrintHtml(feeds)
561 if err != nil {
562 return nil, err
563 }
564
565 // cap body size to prevent abuse
566 if len(html)+len(text) > 5*shared.MB {
567 feeds.Options.InlineContent = false
568 feeds.SizeWarning = true
569 html, err = f.PrintHtml(feeds)
570 if err != nil {
571 return nil, err
572 }
573 }
574
575 if allErrors != nil {
576 text = fmt.Sprintf("> %s\n\n%s", allErrors, text)
577 html = fmt.Sprintf("<blockquote>%s</blockquote><br /><br/>%s", allErrors, html)
578 }
579
580 return &MsgBody{
581 Text: text,
582 Html: html,
583 }, nil
584}
585
586func (f *Fetcher) SendEmail(logger *slog.Logger, username, email, subject, unsubURL string, msg *MsgBody) error {
587 if email == "" {
588 return fmt.Errorf("(%s) does not have an email associated with their feed post", username)
589 }
590 smtpAddr := f.host
591 fromEmail := "hello@pico.sh"
592 to := []string{email}
593 headers := map[string]string{
594 "From": fromEmail,
595 "Subject": subject,
596 "To": email,
597 "MIME-Version": "1.0",
598 "Content-Type": `multipart/alternative; boundary="boundary123"`,
599 "List-Unsubscribe": "<" + unsubURL + ">",
600 }
601 var content strings.Builder
602 for k, v := range headers {
603 content.WriteString(fmt.Sprintf("%s: %s\r\n", k, v))
604 }
605 content.WriteString("\r\n")
606 content.WriteString("\r\n--boundary123\r\n")
607 content.WriteString("Content-Type: text/plain; charset=\"utf-8\"\r\n")
608 content.WriteString("\r\n" + msg.Text + "\r\n")
609 content.WriteString("--boundary123\r\n")
610 content.WriteString("Content-Type: text/html; charset=\"utf-8\"\r\n")
611 content.WriteString("\r\n" + msg.Html + "\r\n")
612 content.WriteString("--boundary123--")
613
614 reader := strings.NewReader(content.String())
615 logger.Info("sending email digest")
616 err := smtp.SendMail(
617 smtpAddr,
618 f.auth,
619 fromEmail,
620 to,
621 reader,
622 )
623 return err
624}
625
626func (f *Fetcher) Run(now time.Time) error {
627 users, err := f.db.FindUsersWithPost("feeds")
628 if err != nil {
629 return err
630 }
631
632 for _, user := range users {
633 err := f.RunUser(user, now)
634 if err != nil {
635 f.cfg.Logger.Error("run user failed", "err", err)
636 continue
637 }
638 }
639
640 return nil
641}
642
643func (f *Fetcher) Loop() {
644 logger := f.cfg.Logger
645 for {
646 logger.Info("running digest emailer")
647
648 err := f.Run(time.Now().UTC())
649 if err != nil {
650 logger.Error("run failed", "err", err)
651 }
652
653 logger.Info("digest emailer finished, waiting 1min ...")
654 time.Sleep(1 * time.Minute)
655 }
656}