Eric Bower
·
2025-05-25
cron.go
1package feeds
2
3import (
4 "crypto/tls"
5 "errors"
6 "fmt"
7 html "html/template"
8 "io"
9 "log/slog"
10 "math"
11 "net/http"
12 "net/url"
13 "strings"
14 "text/template"
15 "time"
16
17 "github.com/mmcdole/gofeed"
18 "github.com/picosh/pico/pkg/db"
19 "github.com/picosh/pico/pkg/shared"
20 "github.com/sendgrid/sendgrid-go"
21 "github.com/sendgrid/sendgrid-go/helpers/mail"
22)
23
24var ErrNoRecentArticles = errors.New("no recent articles")
25
26type UserAgentTransport struct {
27 http.RoundTripper
28}
29
30func (c *UserAgentTransport) RoundTrip(r *http.Request) (*http.Response, error) {
31 userAgent := "linux:feeds:v2 (by /u/pico-sh)"
32 r.Header.Set("User-Agent", userAgent)
33 r.Header.Set("Accept", "*/*")
34 return c.RoundTripper.RoundTrip(r)
35}
36
37var httpClient = http.Client{
38 Transport: &UserAgentTransport{
39 &http.Transport{
40 TLSClientConfig: &tls.Config{},
41 },
42 },
43}
44
45type FeedItemTmpl struct {
46 GUID string
47 Title string
48 Link string
49 PublishedAt *time.Time
50 Content html.HTML
51 Description html.HTML
52}
53
54type Feed struct {
55 Title string
56 Link string
57 Description string
58 Items []*FeedItemTmpl
59 FeedItems []*gofeed.Item
60}
61
62type DigestFeed struct {
63 Feeds []*Feed
64 Options DigestOptions
65 KeepAliveURL string
66 UnsubURL string
67 DaysLeft string
68 ShowBanner bool
69}
70
71type DigestOptions struct {
72 InlineContent bool
73}
74
75func itemToTemplate(item *gofeed.Item) *FeedItemTmpl {
76 return &FeedItemTmpl{
77 Title: item.Title,
78 Link: item.Link,
79 PublishedAt: item.PublishedParsed,
80 Description: html.HTML(item.Description),
81 Content: html.HTML(item.Content),
82 }
83}
84
85func DigestOptionToTime(lastDigest time.Time, interval string) time.Time {
86 day := 24 * time.Hour
87 switch interval {
88 case "10min":
89 return lastDigest.Add(10 * time.Minute)
90 case "1hour":
91 return lastDigest.Add(1 * time.Hour)
92 case "6hour":
93 return lastDigest.Add(6 * time.Hour)
94 case "12hour":
95 return lastDigest.Add(12 * time.Hour)
96 case "1day", "":
97 return lastDigest.Add(1 * day)
98 case "7day":
99 return lastDigest.Add(7 * day)
100 case "30day":
101 return lastDigest.Add(30 * day)
102 default:
103 return lastDigest
104 }
105}
106
107func getFeedItemID(logger *slog.Logger, item *gofeed.Item) string {
108 guid := item.GUID
109 if item.GUID == "" {
110 logger.Info("no <guid> found for feed item, using <link> instead for its unique id")
111 return item.Link
112 }
113 return guid
114}
115
116// see if this feed item should be emailed to user.
117func isValidItem(logger *slog.Logger, item *gofeed.Item, feedItems []*db.FeedItem) bool {
118 for _, feedItem := range feedItems {
119 if getFeedItemID(logger, item) == feedItem.GUID {
120 return false
121 }
122 }
123
124 return true
125}
126
127type Fetcher struct {
128 cfg *shared.ConfigSite
129 db db.DB
130}
131
132func NewFetcher(dbpool db.DB, cfg *shared.ConfigSite) *Fetcher {
133 return &Fetcher{
134 db: dbpool,
135 cfg: cfg,
136 }
137}
138
139func (f *Fetcher) Validate(post *db.Post, parsed *shared.ListParsedText) error {
140 lastDigest := post.Data.LastDigest
141 if lastDigest == nil {
142 return nil
143 }
144
145 now := time.Now().UTC()
146
147 expiresAt := post.ExpiresAt
148 if expiresAt != nil {
149 if post.ExpiresAt.Before(now) {
150 return fmt.Errorf("(%s) post has expired, skipping", post.ExpiresAt.Format(time.RFC3339))
151 }
152 }
153
154 digestAt := DigestOptionToTime(*lastDigest, parsed.DigestInterval)
155 if digestAt.After(now) {
156 return fmt.Errorf("(%s) not time to digest, skipping", digestAt.Format(time.RFC3339))
157 }
158 return nil
159}
160
161func (f *Fetcher) RunPost(logger *slog.Logger, user *db.User, post *db.Post, skipValidation bool) error {
162 logger = logger.With("filename", post.Filename)
163 logger.Info("running feed post")
164
165 parsed := shared.ListParseText(post.Text)
166
167 if parsed.Email == "" {
168 logger.Error("post does not have an email associated, removing post")
169 err := f.db.RemovePosts([]string{post.ID})
170 if err != nil {
171 return err
172 }
173 }
174
175 logger.Info("last digest at", "lastDigest", post.Data.LastDigest.Format(time.RFC3339))
176 err := f.Validate(post, parsed)
177 if err != nil {
178 logger.Info("validation failed", "err", err)
179 if skipValidation {
180 logger.Info("overriding validation error, continuing")
181 } else {
182 return nil
183 }
184 }
185
186 urls := []string{}
187 for _, item := range parsed.Items {
188 u := ""
189 if item.IsText || item.IsURL {
190 u = item.Value
191 } else if item.IsURL {
192 u = string(item.Value)
193 }
194
195 if u == "" {
196 continue
197 }
198
199 _, err := url.Parse(string(item.URL))
200 if err != nil {
201 logger.Info("invalid url", "url", string(item.URL))
202 continue
203 }
204
205 logger.Info("found rss feed url", "url", u)
206 urls = append(urls, u)
207 }
208
209 now := time.Now().UTC()
210 if post.ExpiresAt == nil {
211 expiresAt := time.Now().AddDate(0, 12, 0)
212 post.ExpiresAt = &expiresAt
213 }
214 _, err = f.db.UpdatePost(post)
215 if err != nil {
216 return err
217 }
218
219 subject := fmt.Sprintf("%s feed digest", post.Title)
220
221 msgBody, err := f.FetchAll(logger, urls, parsed.InlineContent, user.Name, post)
222 if err != nil {
223 errForUser := err
224
225 // we don't want to increment in this case
226 if errors.Is(errForUser, ErrNoRecentArticles) {
227 return nil
228 }
229
230 post.Data.Attempts += 1
231 logger.Error("could not fetch urls", "err", err, "attempts", post.Data.Attempts)
232
233 maxAttempts := 10
234 errBody := fmt.Sprintf(`There was an error attempting to fetch your feeds (%d) times. After (%d) attempts we remove the file from our system. Please check all the URLs and re-upload.
235Also, we have centralized logs in our pico.sh TUI that will display realtime feed errors so you can debug.
236
237
238%s
239
240
241%s`, post.Data.Attempts, maxAttempts, errForUser.Error(), post.Text)
242 err = f.SendEmail(
243 logger, user.Name,
244 parsed.Email,
245 subject,
246 &MsgBody{Html: strings.ReplaceAll(errBody, "\n", "<br />"), Text: errBody},
247 )
248 if err != nil {
249 return err
250 }
251
252 if post.Data.Attempts >= maxAttempts {
253 err = f.db.RemovePosts([]string{post.ID})
254 if err != nil {
255 return err
256 }
257 } else {
258 _, err = f.db.UpdatePost(post)
259 if err != nil {
260 return err
261 }
262 }
263 return errForUser
264 } else {
265 post.Data.Attempts = 0
266 _, err := f.db.UpdatePost(post)
267 if err != nil {
268 return err
269 }
270 }
271
272 if msgBody != nil {
273 err = f.SendEmail(logger, user.Name, parsed.Email, subject, msgBody)
274 if err != nil {
275 return err
276 }
277 }
278
279 post.Data.LastDigest = &now
280 _, err = f.db.UpdatePost(post)
281 if err != nil {
282 return err
283 }
284
285 return nil
286}
287
288func (f *Fetcher) RunUser(user *db.User) error {
289 logger := shared.LoggerWithUser(f.cfg.Logger, user)
290 posts, err := f.db.FindPostsForUser(&db.Pager{Num: 100}, user.ID, "feeds")
291 if err != nil {
292 return err
293 }
294
295 if len(posts.Data) > 0 {
296 logger.Info("found feed posts", "len", len(posts.Data))
297 }
298
299 for _, post := range posts.Data {
300 err = f.RunPost(logger, user, post, false)
301 if err != nil {
302 logger.Error("run post failed", "err", err)
303 }
304 }
305
306 return nil
307}
308
309func (f *Fetcher) ParseURL(fp *gofeed.Parser, url string) (*gofeed.Feed, error) {
310 req, err := http.NewRequest("GET", url, nil)
311 if err != nil {
312 return nil, err
313 }
314
315 resp, err := httpClient.Do(req)
316 if err != nil {
317 return nil, err
318 }
319
320 defer func() {
321 _ = resp.Body.Close()
322 }()
323 body, err := io.ReadAll(resp.Body)
324 if err != nil {
325 return nil, err
326 }
327
328 if resp.StatusCode < 200 || resp.StatusCode > 300 {
329 return nil, fmt.Errorf("fetching feed resulted in an error: %s %s", resp.Status, body)
330 }
331
332 feed, err := fp.ParseString(string(body))
333 if err != nil {
334 return nil, err
335 }
336
337 return feed, nil
338}
339
340func (f *Fetcher) Fetch(logger *slog.Logger, fp *gofeed.Parser, url string, username string, feedItems []*db.FeedItem) (*Feed, error) {
341 logger.Info("fetching feed", "url", url)
342
343 feed, err := f.ParseURL(fp, url)
344 if err != nil {
345 return nil, err
346 }
347
348 feedTmpl := &Feed{
349 Title: feed.Title,
350 Description: feed.Description,
351 Link: feed.Link,
352 }
353
354 items := []*FeedItemTmpl{}
355 gofeedItems := []*gofeed.Item{}
356 // we only want to return feed items published since the last digest time we fetched
357 for _, item := range feed.Items {
358 if item == nil {
359 continue
360 }
361
362 if !isValidItem(logger, item, feedItems) {
363 logger.Info("feed item already served", "guid", item.GUID)
364 continue
365 }
366
367 gofeedItems = append(gofeedItems, item)
368 items = append(items, itemToTemplate(item))
369 }
370
371 if len(items) == 0 {
372 return nil, fmt.Errorf(
373 "%s %w, skipping",
374 url,
375 ErrNoRecentArticles,
376 )
377 }
378
379 feedTmpl.FeedItems = gofeedItems
380 feedTmpl.Items = items
381 return feedTmpl, nil
382}
383
384func (f *Fetcher) PrintText(feedTmpl *DigestFeed) (string, error) {
385 ts, err := template.ParseFiles(
386 f.cfg.StaticPath("html/digest_text.page.tmpl"),
387 )
388
389 if err != nil {
390 return "", err
391 }
392
393 w := new(strings.Builder)
394 err = ts.Execute(w, feedTmpl)
395 if err != nil {
396 return "", err
397 }
398
399 return w.String(), nil
400}
401
402func (f *Fetcher) PrintHtml(feedTmpl *DigestFeed) (string, error) {
403 ts, err := html.ParseFiles(
404 f.cfg.StaticPath("html/digest.page.tmpl"),
405 )
406
407 if err != nil {
408 return "", err
409 }
410
411 w := new(strings.Builder)
412 err = ts.Execute(w, feedTmpl)
413 if err != nil {
414 return "", err
415 }
416
417 return w.String(), nil
418}
419
420type MsgBody struct {
421 Html string
422 Text string
423}
424
425func (f *Fetcher) FetchAll(logger *slog.Logger, urls []string, inlineContent bool, username string, post *db.Post) (*MsgBody, error) {
426 logger.Info("fetching feeds", "inlineContent", inlineContent)
427 fp := gofeed.NewParser()
428 daysLeft := ""
429 showBanner := false
430 if post.ExpiresAt != nil {
431 diff := time.Until(*post.ExpiresAt)
432 daysLeftInt := int(math.Ceil(diff.Hours() / 24))
433 daysLeft = fmt.Sprintf("%d", daysLeftInt)
434 if daysLeftInt <= 30 {
435 showBanner = true
436 }
437 }
438 feeds := &DigestFeed{
439 KeepAliveURL: fmt.Sprintf("https://feeds.pico.sh/keep-alive/%s", post.ID),
440 UnsubURL: fmt.Sprintf("https://feeds.pico.sh/unsub/%s", post.ID),
441 DaysLeft: daysLeft,
442 ShowBanner: showBanner,
443 Options: DigestOptions{InlineContent: inlineContent},
444 }
445 feedItems, err := f.db.FindFeedItemsByPostID(post.ID)
446 if err != nil {
447 return nil, err
448 }
449
450 if len(urls) == 0 {
451 return nil, fmt.Errorf("feed file does not contain any urls")
452 }
453
454 var allErrors error
455 for _, url := range urls {
456 feedTmpl, err := f.Fetch(logger, fp, url, username, feedItems)
457 if err != nil {
458 if errors.Is(err, ErrNoRecentArticles) {
459 logger.Info("no recent articles", "err", err)
460 } else {
461 allErrors = errors.Join(allErrors, fmt.Errorf("%s: %w", url, err))
462 logger.Error("fetch error", "err", err)
463 }
464 continue
465 }
466 feeds.Feeds = append(feeds.Feeds, feedTmpl)
467 }
468
469 if len(feeds.Feeds) == 0 {
470 if allErrors != nil {
471 return nil, allErrors
472 }
473 return nil, fmt.Errorf("%w, skipping email", ErrNoRecentArticles)
474 }
475
476 fdi := []*db.FeedItem{}
477 for _, feed := range feeds.Feeds {
478 for _, item := range feed.FeedItems {
479 uid := getFeedItemID(logger, item)
480 fdi = append(fdi, &db.FeedItem{
481 PostID: post.ID,
482 GUID: uid,
483 Data: db.FeedItemData{
484 Title: item.Title,
485 Description: item.Description,
486 Content: item.Content,
487 Link: item.Link,
488 PublishedAt: item.PublishedParsed,
489 },
490 })
491 }
492 }
493 err = f.db.InsertFeedItems(post.ID, fdi)
494 if err != nil {
495 return nil, err
496 }
497
498 text, err := f.PrintText(feeds)
499 if err != nil {
500 return nil, err
501 }
502
503 html, err := f.PrintHtml(feeds)
504 if err != nil {
505 return nil, err
506 }
507
508 if allErrors != nil {
509 text = fmt.Sprintf("> %s\n\n%s", allErrors, text)
510 html = fmt.Sprintf("<blockquote>%s</blockquote><br /><br/>%s", allErrors, html)
511 }
512
513 return &MsgBody{
514 Text: text,
515 Html: html,
516 }, nil
517}
518
519func (f *Fetcher) SendEmail(logger *slog.Logger, username, email string, subject string, msg *MsgBody) error {
520 if email == "" {
521 return fmt.Errorf("(%s) does not have an email associated with their feed post", username)
522 }
523
524 from := mail.NewEmail("team pico", shared.DefaultEmail)
525 to := mail.NewEmail(username, email)
526
527 // f.logger.Infof("message body (%s)", plainTextContent)
528
529 message := mail.NewSingleEmail(from, subject, to, msg.Text, msg.Html)
530 client := sendgrid.NewSendClient(f.cfg.SendgridKey)
531
532 logger.Info("sending email digest")
533 response, err := client.Send(message)
534 if err != nil {
535 return err
536 }
537
538 // f.logger.Infof("(%s) email digest response: %v", username, response)
539
540 if len(response.Headers["X-Message-Id"]) > 0 {
541 logger.Info(
542 "successfully sent email digest",
543 "email", email,
544 "x-message-id", response.Headers["X-Message-Id"][0],
545 )
546 } else {
547 logger.Error(
548 "could not find x-message-id, which means sending an email failed",
549 "email", email,
550 )
551 }
552
553 return nil
554}
555
556func (f *Fetcher) Run(logger *slog.Logger) error {
557 users, err := f.db.FindUsers()
558 if err != nil {
559 return err
560 }
561
562 for _, user := range users {
563 err := f.RunUser(user)
564 if err != nil {
565 logger.Error("run user failed", "err", err)
566 continue
567 }
568 }
569
570 return nil
571}
572
573func (f *Fetcher) Loop() {
574 logger := f.cfg.Logger
575 for {
576 logger.Info("running digest emailer")
577
578 err := f.Run(logger)
579 if err != nil {
580 logger.Error("run failed", "err", err)
581 }
582
583 logger.Info("digest emailer finished, waiting 10 mins")
584 time.Sleep(10 * time.Minute)
585 }
586}