Eric Bower
·
2025-08-30
cron.go
1package feeds
2
3import (
4 "crypto/tls"
5 "errors"
6 "fmt"
7 html "html/template"
8 "io"
9 "log/slog"
10 "math"
11 "net/http"
12 "net/url"
13 "os"
14 "strings"
15 "text/template"
16 "time"
17
18 "github.com/adhocore/gronx"
19 "github.com/emersion/go-sasl"
20 "github.com/emersion/go-smtp"
21 "github.com/mmcdole/gofeed"
22 "github.com/picosh/pico/pkg/db"
23 "github.com/picosh/pico/pkg/shared"
24 "github.com/picosh/utils"
25)
26
27var ErrNoRecentArticles = errors.New("no recent articles")
28
29type UserAgentTransport struct {
30 http.RoundTripper
31}
32
33func (c *UserAgentTransport) RoundTrip(r *http.Request) (*http.Response, error) {
34 userAgent := "linux:feeds:v2 (by /u/pico-sh)"
35 r.Header.Set("User-Agent", userAgent)
36 r.Header.Set("Accept", "*/*")
37 return c.RoundTripper.RoundTrip(r)
38}
39
40var httpClient = http.Client{
41 Transport: &UserAgentTransport{
42 &http.Transport{
43 TLSClientConfig: &tls.Config{},
44 },
45 },
46}
47
48type FeedItemTmpl struct {
49 GUID string
50 Title string
51 Link string
52 PublishedAt *time.Time
53 Content html.HTML
54 Description html.HTML
55}
56
57type Feed struct {
58 Title string
59 Link string
60 Description string
61 Items []*FeedItemTmpl
62 FeedItems []*gofeed.Item
63}
64
65type DigestFeed struct {
66 Feeds []*Feed
67 Options DigestOptions
68 KeepAliveURL string
69 UnsubURL string
70 DaysLeft string
71 ShowBanner bool
72 SizeWarning bool
73}
74
75type DigestOptions struct {
76 InlineContent bool
77}
78
79func itemToTemplate(item *gofeed.Item) *FeedItemTmpl {
80 return &FeedItemTmpl{
81 Title: item.Title,
82 Link: item.Link,
83 PublishedAt: item.PublishedParsed,
84 Description: html.HTML(item.Description),
85 Content: html.HTML(item.Content),
86 }
87}
88
89func DigestIntervalToCron(interval string) string {
90 switch interval {
91 case "10min":
92 return "*/10 * * * *"
93 case "1hour":
94 return "0 * * * *"
95 case "6hour":
96 return "0 */6 * * *"
97 case "12hour":
98 return "0 */12 * * *"
99 case "1day", "":
100 return "0 13 * * *"
101 case "7day":
102 return "0 13 * * 0"
103 case "30day":
104 return "0 13 1 * *"
105 default:
106 return "0 13 * * *"
107 }
108}
109
110func getFeedItemID(logger *slog.Logger, item *gofeed.Item) string {
111 guid := strings.ToValidUTF8(item.GUID, "")
112 if item.GUID == "" {
113 logger.Info("no <guid> found for feed item, using <link> instead for its unique id")
114 return strings.ToValidUTF8(item.Link, "")
115 }
116 return guid
117}
118
119// see if this feed item should be emailed to user.
120func isValidItem(logger *slog.Logger, item *gofeed.Item, feedItems []*db.FeedItem) bool {
121 for _, feedItem := range feedItems {
122 if getFeedItemID(logger, item) == feedItem.GUID {
123 return false
124 }
125 }
126
127 return true
128}
129
130type Fetcher struct {
131 cfg *shared.ConfigSite
132 db db.DB
133 auth sasl.Client
134 gron *gronx.Gronx
135 host string
136}
137
138func NewFetcher(dbpool db.DB, cfg *shared.ConfigSite) *Fetcher {
139 host := os.Getenv("PICO_SMTP_HOST")
140 smtPass := os.Getenv("PICO_SMTP_PASS")
141 emailLogin := os.Getenv("PICO_SMTP_USER")
142
143 auth := sasl.NewPlainClient("", emailLogin, smtPass)
144 gron := gronx.New()
145 return &Fetcher{
146 db: dbpool,
147 cfg: cfg,
148 auth: auth,
149 gron: gron,
150 host: host,
151 }
152}
153
154func DateToMin(now time.Time) time.Time {
155 return time.Date(
156 now.Year(), now.Month(), now.Day(),
157 now.Hour(), now.Minute(),
158 0, 0, // zero out second and nano-second for cron
159 now.Location(),
160 )
161}
162
163func (f *Fetcher) Validate(logger *slog.Logger, post *db.Post, parsed *shared.ListParsedText, now time.Time) error {
164 expiresAt := post.ExpiresAt
165 if expiresAt != nil {
166 if post.ExpiresAt.Before(now) {
167 return fmt.Errorf("(%s) post has expired, skipping", post.ExpiresAt.Format(time.RFC3339))
168 }
169 }
170
171 cron := parsed.Cron
172 // support for posts with deprecated `digest_interval` property
173 if parsed.DigestInterval != "" {
174 cron = DigestIntervalToCron(parsed.DigestInterval)
175 }
176 logger.Info("found cron", "cron", cron)
177
178 if !f.gron.IsValid(cron) {
179 return fmt.Errorf("(%s) is invalid `cron`, skipping", cron)
180 }
181
182 dt := DateToMin(now)
183 isDue, err := f.gron.IsDue(cron, dt)
184 if err != nil {
185 return fmt.Errorf("cron error, skipping; err: %w", err)
186 }
187 if !isDue {
188 nextTime, _ := gronx.NextTickAfter(cron, dt, true)
189 return fmt.Errorf(
190 "cron not time to digest, skipping; cur run: %s, next run: %s",
191 f.gron.C.GetRef(),
192 nextTime,
193 )
194 }
195 return nil
196}
197
198func (f *Fetcher) RunPost(logger *slog.Logger, user *db.User, post *db.Post, skipValidation bool, now time.Time) error {
199 logger = logger.With("filename", post.Filename)
200 logger.Info("running feed post")
201
202 parsed := shared.ListParseText(post.Text)
203
204 if parsed.Email == "" {
205 logger.Error("post does not have an email associated, removing post")
206 err := f.db.RemovePosts([]string{post.ID})
207 if err != nil {
208 return err
209 }
210 }
211
212 if post.Data.LastDigest != nil {
213 logger.Info("last digest", "at", post.Data.LastDigest.Format(time.RFC3339))
214 } else {
215 logger.Info("first time running post")
216 }
217 err := f.Validate(logger, post, parsed, now)
218 if err != nil {
219 logger.Info("validation failed", "err", err)
220 if skipValidation {
221 logger.Info("overriding validation error, continuing")
222 } else {
223 return nil
224 }
225 }
226 logger.Info("validation success")
227
228 urls := []string{}
229 for _, item := range parsed.Items {
230 u := ""
231 if item.IsText || item.IsURL {
232 u = item.Value
233 } else if item.IsURL {
234 u = string(item.Value)
235 }
236
237 if u == "" {
238 continue
239 }
240
241 _, err := url.Parse(string(item.URL))
242 if err != nil {
243 logger.Info("invalid url", "url", string(item.URL))
244 continue
245 }
246
247 logger.Info("found rss feed url", "url", u)
248 urls = append(urls, u)
249 }
250
251 if post.ExpiresAt == nil {
252 expiresAt := now.AddDate(0, 12, 0)
253 post.ExpiresAt = &expiresAt
254 }
255 _, err = f.db.UpdatePost(post)
256 if err != nil {
257 return err
258 }
259
260 subject := fmt.Sprintf("%s feed digest", post.Filename)
261 unsubURL := getUnsubURL(post)
262
263 msgBody, err := f.FetchAll(logger, urls, parsed.InlineContent, user.Name, post)
264 if err != nil {
265 errForUser := err
266
267 // we don't want to increment in this case
268 if errors.Is(errForUser, ErrNoRecentArticles) {
269 return nil
270 }
271
272 post.Data.Attempts += 1
273 logger.Error("could not fetch urls", "err", err, "attempts", post.Data.Attempts)
274
275 maxAttempts := 10
276 errBody := fmt.Sprintf(`There was an error attempting to fetch your feeds (%d) times. After (%d) attempts we remove the file from our system. Please check all the URLs and re-upload.
277Also, we have centralized logs in our pico.sh TUI that will display realtime feed errors so you can debug.
278
279
280%s
281
282
283%s`, post.Data.Attempts, maxAttempts, errForUser.Error(), post.Text)
284 err = f.SendEmail(
285 logger,
286 user.Name,
287 parsed.Email,
288 subject,
289 unsubURL,
290 &MsgBody{Html: strings.ReplaceAll(errBody, "\n", "<br />"), Text: errBody},
291 )
292 if err != nil {
293 return err
294 }
295
296 if post.Data.Attempts >= maxAttempts {
297 err = f.db.RemovePosts([]string{post.ID})
298 if err != nil {
299 return err
300 }
301 } else {
302 _, err = f.db.UpdatePost(post)
303 if err != nil {
304 return err
305 }
306 }
307 return errForUser
308 } else {
309 post.Data.Attempts = 0
310 _, err := f.db.UpdatePost(post)
311 if err != nil {
312 return err
313 }
314 }
315
316 if msgBody != nil {
317 err = f.SendEmail(logger, user.Name, parsed.Email, subject, unsubURL, msgBody)
318 if err != nil {
319 return err
320 }
321 }
322
323 post.Data.LastDigest = &now
324 _, err = f.db.UpdatePost(post)
325 if err != nil {
326 return err
327 }
328
329 return nil
330}
331
332func (f *Fetcher) RunUser(user *db.User, now time.Time) error {
333 logger := shared.LoggerWithUser(f.cfg.Logger, user)
334 logger.Info("run user")
335 posts, err := f.db.FindPostsForUser(&db.Pager{Num: 100}, user.ID, "feeds")
336 if err != nil {
337 return err
338 }
339
340 if len(posts.Data) > 0 {
341 logger.Info("found feed posts", "len", len(posts.Data))
342 }
343
344 for _, post := range posts.Data {
345 err = f.RunPost(logger, user, post, false, now)
346 if err != nil {
347 logger.Error("run post failed", "err", err)
348 }
349 }
350
351 return nil
352}
353
354func (f *Fetcher) ParseURL(fp *gofeed.Parser, url string) (*gofeed.Feed, error) {
355 req, err := http.NewRequest("GET", url, nil)
356 if err != nil {
357 return nil, err
358 }
359
360 resp, err := httpClient.Do(req)
361 if err != nil {
362 return nil, err
363 }
364
365 defer func() {
366 _ = resp.Body.Close()
367 }()
368 body, err := io.ReadAll(resp.Body)
369 if err != nil {
370 return nil, err
371 }
372
373 if resp.StatusCode < 200 || resp.StatusCode > 300 {
374 return nil, fmt.Errorf("fetching feed resulted in an error: %s %s", resp.Status, body)
375 }
376
377 feed, err := fp.ParseString(string(body))
378 if err != nil {
379 return nil, err
380 }
381
382 return feed, nil
383}
384
385func (f *Fetcher) Fetch(logger *slog.Logger, fp *gofeed.Parser, url string, username string, feedItems []*db.FeedItem) (*Feed, error) {
386 logger.Info("fetching feed", "url", url)
387
388 feed, err := f.ParseURL(fp, url)
389 if err != nil {
390 return nil, err
391 }
392
393 feedTmpl := &Feed{
394 Title: feed.Title,
395 Description: feed.Description,
396 Link: feed.Link,
397 }
398
399 items := []*FeedItemTmpl{}
400 gofeedItems := []*gofeed.Item{}
401 // we only want to return feed items published since the last digest time we fetched
402 for _, item := range feed.Items {
403 if item == nil {
404 continue
405 }
406
407 if !isValidItem(logger, item, feedItems) {
408 logger.Info("feed item already served", "guid", item.GUID)
409 continue
410 }
411
412 gofeedItems = append(gofeedItems, item)
413 items = append(items, itemToTemplate(item))
414 }
415
416 if len(items) == 0 {
417 return nil, fmt.Errorf(
418 "%s %w, skipping",
419 url,
420 ErrNoRecentArticles,
421 )
422 }
423
424 feedTmpl.FeedItems = gofeedItems
425 feedTmpl.Items = items
426 return feedTmpl, nil
427}
428
429func (f *Fetcher) PrintText(feedTmpl *DigestFeed) (string, error) {
430 ts, err := template.ParseFiles(
431 f.cfg.StaticPath("html/digest_text.page.tmpl"),
432 )
433
434 if err != nil {
435 return "", err
436 }
437
438 w := new(strings.Builder)
439 err = ts.Execute(w, feedTmpl)
440 if err != nil {
441 return "", err
442 }
443
444 return w.String(), nil
445}
446
447func (f *Fetcher) PrintHtml(feedTmpl *DigestFeed) (string, error) {
448 ts, err := html.ParseFiles(
449 f.cfg.StaticPath("html/digest.page.tmpl"),
450 )
451
452 if err != nil {
453 return "", err
454 }
455
456 w := new(strings.Builder)
457 err = ts.Execute(w, feedTmpl)
458 if err != nil {
459 return "", err
460 }
461
462 return w.String(), nil
463}
464
465type MsgBody struct {
466 Html string
467 Text string
468}
469
470func getUnsubURL(post *db.Post) string {
471 return fmt.Sprintf("https://feeds.pico.sh/unsub/%s", post.ID)
472}
473
474func (f *Fetcher) FetchAll(logger *slog.Logger, urls []string, inlineContent bool, username string, post *db.Post) (*MsgBody, error) {
475 logger.Info("fetching feeds", "inlineContent", inlineContent)
476 fp := gofeed.NewParser()
477 daysLeft := ""
478 showBanner := false
479 if post.ExpiresAt != nil {
480 diff := time.Until(*post.ExpiresAt)
481 daysLeftInt := int(math.Ceil(diff.Hours() / 24))
482 daysLeft = fmt.Sprintf("%d", daysLeftInt)
483 if daysLeftInt <= 30 {
484 showBanner = true
485 }
486 }
487 feeds := &DigestFeed{
488 KeepAliveURL: fmt.Sprintf("https://feeds.pico.sh/keep-alive/%s", post.ID),
489 UnsubURL: getUnsubURL(post),
490 DaysLeft: daysLeft,
491 ShowBanner: showBanner,
492 Options: DigestOptions{InlineContent: inlineContent},
493 }
494 feedItems, err := f.db.FindFeedItemsByPostID(post.ID)
495 if err != nil {
496 return nil, err
497 }
498
499 if len(urls) == 0 {
500 return nil, fmt.Errorf("feed file does not contain any urls")
501 }
502
503 var allErrors error
504 for _, url := range urls {
505 feedTmpl, err := f.Fetch(logger, fp, url, username, feedItems)
506 if err != nil {
507 if errors.Is(err, ErrNoRecentArticles) {
508 logger.Info("no recent articles", "err", err)
509 } else {
510 allErrors = errors.Join(allErrors, fmt.Errorf("%s: %w", url, err))
511 logger.Error("fetch error", "err", err)
512 }
513 continue
514 }
515 feeds.Feeds = append(feeds.Feeds, feedTmpl)
516 }
517
518 if len(feeds.Feeds) == 0 {
519 if allErrors != nil {
520 return nil, allErrors
521 }
522 return nil, fmt.Errorf("%w, skipping email", ErrNoRecentArticles)
523 }
524
525 fdi := []*db.FeedItem{}
526 for _, feed := range feeds.Feeds {
527 for _, item := range feed.FeedItems {
528 uid := getFeedItemID(logger, item)
529 fdi = append(fdi, &db.FeedItem{
530 PostID: post.ID,
531 GUID: uid,
532 Data: db.FeedItemData{
533 Title: strings.ToValidUTF8(item.Title, ""),
534 Description: strings.ToValidUTF8(item.Description, ""),
535 Content: strings.ToValidUTF8(item.Content, ""),
536 Link: item.Link,
537 PublishedAt: item.PublishedParsed,
538 },
539 })
540 }
541 }
542 err = f.db.InsertFeedItems(post.ID, fdi)
543 if err != nil {
544 return nil, err
545 }
546
547 text, err := f.PrintText(feeds)
548 if err != nil {
549 return nil, err
550 }
551
552 html, err := f.PrintHtml(feeds)
553 if err != nil {
554 return nil, err
555 }
556
557 // cap body size to prevent abuse
558 if len(html)+len(text) > 5*utils.MB {
559 feeds.Options.InlineContent = false
560 feeds.SizeWarning = true
561 html, err = f.PrintHtml(feeds)
562 if err != nil {
563 return nil, err
564 }
565 }
566
567 if allErrors != nil {
568 text = fmt.Sprintf("> %s\n\n%s", allErrors, text)
569 html = fmt.Sprintf("<blockquote>%s</blockquote><br /><br/>%s", allErrors, html)
570 }
571
572 return &MsgBody{
573 Text: text,
574 Html: html,
575 }, nil
576}
577
578func (f *Fetcher) SendEmail(logger *slog.Logger, username, email, subject, unsubURL string, msg *MsgBody) error {
579 if email == "" {
580 return fmt.Errorf("(%s) does not have an email associated with their feed post", username)
581 }
582 smtpAddr := f.host
583 fromEmail := "hello@pico.sh"
584 to := []string{email}
585 headers := map[string]string{
586 "From": fromEmail,
587 "Subject": subject,
588 "To": email,
589 "MIME-Version": "1.0",
590 "Content-Type": `multipart/alternative; boundary="boundary123"`,
591 "List-Unsubscribe": "<" + unsubURL + ">",
592 }
593 var content strings.Builder
594 for k, v := range headers {
595 content.WriteString(fmt.Sprintf("%s: %s\r\n", k, v))
596 }
597 content.WriteString("\r\n")
598 content.WriteString("\r\n--boundary123\r\n")
599 content.WriteString("Content-Type: text/plain; charset=\"utf-8\"\r\n")
600 content.WriteString("\r\n" + msg.Text + "\r\n")
601 content.WriteString("--boundary123\r\n")
602 content.WriteString("Content-Type: text/html; charset=\"utf-8\"\r\n")
603 content.WriteString("\r\n" + msg.Html + "\r\n")
604 content.WriteString("--boundary123--")
605
606 reader := strings.NewReader(content.String())
607 logger.Info("sending email digest")
608 err := smtp.SendMail(
609 smtpAddr,
610 f.auth,
611 fromEmail,
612 to,
613 reader,
614 )
615 return err
616}
617
618func (f *Fetcher) Run(now time.Time) error {
619 users, err := f.db.FindUsersWithPost("feeds")
620 if err != nil {
621 return err
622 }
623
624 for _, user := range users {
625 err := f.RunUser(user, now)
626 if err != nil {
627 f.cfg.Logger.Error("run user failed", "err", err)
628 continue
629 }
630 }
631
632 return nil
633}
634
635func (f *Fetcher) Loop() {
636 logger := f.cfg.Logger
637 for {
638 logger.Info("running digest emailer")
639
640 err := f.Run(time.Now().UTC())
641 if err != nil {
642 logger.Error("run failed", "err", err)
643 }
644
645 logger.Info("digest emailer finished, waiting 1min ...")
646 time.Sleep(1 * time.Minute)
647 }
648}