Antonio Mika
·
2025-03-12
cron.go
1package feeds
2
3import (
4 "crypto/tls"
5 "errors"
6 "fmt"
7 html "html/template"
8 "io"
9 "log/slog"
10 "math"
11 "net/http"
12 "net/url"
13 "strings"
14 "text/template"
15 "time"
16
17 "github.com/mmcdole/gofeed"
18 "github.com/picosh/pico/pkg/db"
19 "github.com/picosh/pico/pkg/shared"
20 "github.com/sendgrid/sendgrid-go"
21 "github.com/sendgrid/sendgrid-go/helpers/mail"
22)
23
24var ErrNoRecentArticles = errors.New("no recent articles")
25
26type UserAgentTransport struct {
27 http.RoundTripper
28}
29
30func (c *UserAgentTransport) RoundTrip(r *http.Request) (*http.Response, error) {
31 userAgent := "linux:feeds:v2 (by /u/pico-sh)"
32 r.Header.Set("User-Agent", userAgent)
33 r.Header.Set("Accept", "*/*")
34 return c.RoundTripper.RoundTrip(r)
35}
36
37var httpClient = http.Client{
38 Transport: &UserAgentTransport{
39 &http.Transport{
40 TLSClientConfig: &tls.Config{},
41 },
42 },
43}
44
45type FeedItemTmpl struct {
46 GUID string
47 Title string
48 Link string
49 PublishedAt *time.Time
50 Content html.HTML
51 Description html.HTML
52}
53
54type Feed struct {
55 Title string
56 Link string
57 Description string
58 Items []*FeedItemTmpl
59 FeedItems []*gofeed.Item
60}
61
62type DigestFeed struct {
63 Feeds []*Feed
64 Options DigestOptions
65 KeepAliveURL string
66 UnsubURL string
67 DaysLeft string
68 ShowBanner bool
69}
70
71type DigestOptions struct {
72 InlineContent bool
73}
74
75func itemToTemplate(item *gofeed.Item) *FeedItemTmpl {
76 return &FeedItemTmpl{
77 Title: item.Title,
78 Link: item.Link,
79 PublishedAt: item.PublishedParsed,
80 Description: html.HTML(item.Description),
81 Content: html.HTML(item.Content),
82 }
83}
84
85func DigestOptionToTime(lastDigest time.Time, interval string) time.Time {
86 day := 24 * time.Hour
87 if interval == "10min" {
88 return lastDigest.Add(10 * time.Minute)
89 } else if interval == "1hour" {
90 return lastDigest.Add(1 * time.Hour)
91 } else if interval == "6hour" {
92 return lastDigest.Add(6 * time.Hour)
93 } else if interval == "12hour" {
94 return lastDigest.Add(12 * time.Hour)
95 } else if interval == "1day" || interval == "" {
96 return lastDigest.Add(1 * day)
97 } else if interval == "7day" {
98 return lastDigest.Add(7 * day)
99 } else if interval == "30day" {
100 return lastDigest.Add(30 * day)
101 } else {
102 return lastDigest
103 }
104}
105
106func getFeedItemID(logger *slog.Logger, item *gofeed.Item) string {
107 guid := item.GUID
108 if item.GUID == "" {
109 logger.Info("no <guid> found for feed item, using <link> instead for its unique id")
110 return item.Link
111 }
112 return guid
113}
114
115// see if this feed item should be emailed to user.
116func isValidItem(logger *slog.Logger, item *gofeed.Item, feedItems []*db.FeedItem) bool {
117 for _, feedItem := range feedItems {
118 if getFeedItemID(logger, item) == feedItem.GUID {
119 return false
120 }
121 }
122
123 return true
124}
125
126type Fetcher struct {
127 cfg *shared.ConfigSite
128 db db.DB
129}
130
131func NewFetcher(dbpool db.DB, cfg *shared.ConfigSite) *Fetcher {
132 return &Fetcher{
133 db: dbpool,
134 cfg: cfg,
135 }
136}
137
138func (f *Fetcher) Validate(post *db.Post, parsed *shared.ListParsedText) error {
139 lastDigest := post.Data.LastDigest
140 if lastDigest == nil {
141 return nil
142 }
143
144 now := time.Now().UTC()
145
146 expiresAt := post.ExpiresAt
147 if expiresAt != nil {
148 if post.ExpiresAt.Before(now) {
149 return fmt.Errorf("(%s) post has expired, skipping", post.ExpiresAt.Format(time.RFC3339))
150 }
151 }
152
153 digestAt := DigestOptionToTime(*lastDigest, parsed.DigestInterval)
154 if digestAt.After(now) {
155 return fmt.Errorf("(%s) not time to digest, skipping", digestAt.Format(time.RFC3339))
156 }
157 return nil
158}
159
160func (f *Fetcher) RunPost(logger *slog.Logger, user *db.User, post *db.Post, skipValidation bool) error {
161 logger = logger.With("filename", post.Filename)
162 logger.Info("running feed post")
163
164 parsed := shared.ListParseText(post.Text)
165
166 if parsed.Email == "" {
167 logger.Error("post does not have an email associated, removing post")
168 err := f.db.RemovePosts([]string{post.ID})
169 if err != nil {
170 return err
171 }
172 }
173
174 logger.Info("last digest at", "lastDigest", post.Data.LastDigest.Format(time.RFC3339))
175 err := f.Validate(post, parsed)
176 if err != nil {
177 logger.Info("validation failed", "err", err)
178 if skipValidation {
179 logger.Info("overriding validation error, continuing")
180 } else {
181 return nil
182 }
183 }
184
185 urls := []string{}
186 for _, item := range parsed.Items {
187 u := ""
188 if item.IsText || item.IsURL {
189 u = item.Value
190 } else if item.IsURL {
191 u = string(item.Value)
192 }
193
194 if u == "" {
195 continue
196 }
197
198 _, err := url.Parse(string(item.URL))
199 if err != nil {
200 logger.Info("invalid url", "url", string(item.URL))
201 continue
202 }
203
204 logger.Info("found rss feed url", "url", u)
205 urls = append(urls, u)
206 }
207
208 now := time.Now().UTC()
209 if post.ExpiresAt == nil {
210 expiresAt := time.Now().AddDate(0, 12, 0)
211 post.ExpiresAt = &expiresAt
212 }
213 _, err = f.db.UpdatePost(post)
214 if err != nil {
215 return err
216 }
217
218 subject := fmt.Sprintf("%s feed digest", post.Title)
219
220 msgBody, err := f.FetchAll(logger, urls, parsed.InlineContent, user.Name, post)
221 if err != nil {
222 errForUser := err
223
224 // we don't want to increment in this case
225 if errors.Is(errForUser, ErrNoRecentArticles) {
226 return nil
227 }
228
229 post.Data.Attempts += 1
230 logger.Error("could not fetch urls", "err", err, "attempts", post.Data.Attempts)
231
232 maxAttempts := 10
233 errBody := fmt.Sprintf(`There was an error attempting to fetch your feeds (%d) times. After (%d) attempts we remove the file from our system. Please check all the URLs and re-upload.
234Also, we have centralized logs in our pico.sh TUI that will display realtime feed errors so you can debug.
235
236
237%s
238
239
240%s`, post.Data.Attempts, maxAttempts, errForUser.Error(), post.Text)
241 err = f.SendEmail(
242 logger, user.Name,
243 parsed.Email,
244 subject,
245 &MsgBody{Html: strings.ReplaceAll(errBody, "\n", "<br />"), Text: errBody},
246 )
247 if err != nil {
248 return err
249 }
250
251 if post.Data.Attempts >= maxAttempts {
252 err = f.db.RemovePosts([]string{post.ID})
253 if err != nil {
254 return err
255 }
256 } else {
257 _, err = f.db.UpdatePost(post)
258 if err != nil {
259 return err
260 }
261 }
262 return errForUser
263 } else {
264 post.Data.Attempts = 0
265 _, err := f.db.UpdatePost(post)
266 if err != nil {
267 return err
268 }
269 }
270
271 if msgBody != nil {
272 err = f.SendEmail(logger, user.Name, parsed.Email, subject, msgBody)
273 if err != nil {
274 return err
275 }
276 }
277
278 post.Data.LastDigest = &now
279 _, err = f.db.UpdatePost(post)
280 if err != nil {
281 return err
282 }
283
284 return nil
285}
286
287func (f *Fetcher) RunUser(user *db.User) error {
288 logger := shared.LoggerWithUser(f.cfg.Logger, user)
289 posts, err := f.db.FindPostsForUser(&db.Pager{Num: 100}, user.ID, "feeds")
290 if err != nil {
291 return err
292 }
293
294 if len(posts.Data) > 0 {
295 logger.Info("found feed posts", "len", len(posts.Data))
296 }
297
298 for _, post := range posts.Data {
299 err = f.RunPost(logger, user, post, false)
300 if err != nil {
301 logger.Error("run post failed", "err", err)
302 }
303 }
304
305 return nil
306}
307
308func (f *Fetcher) ParseURL(fp *gofeed.Parser, url string) (*gofeed.Feed, error) {
309 req, err := http.NewRequest("GET", url, nil)
310 if err != nil {
311 return nil, err
312 }
313
314 resp, err := httpClient.Do(req)
315 if err != nil {
316 return nil, err
317 }
318
319 defer resp.Body.Close()
320 body, err := io.ReadAll(resp.Body)
321 if err != nil {
322 return nil, err
323 }
324
325 if resp.StatusCode < 200 || resp.StatusCode > 300 {
326 return nil, fmt.Errorf("fetching feed resulted in an error: %s %s", resp.Status, body)
327 }
328
329 feed, err := fp.ParseString(string(body))
330 if err != nil {
331 return nil, err
332 }
333
334 return feed, nil
335}
336
337func (f *Fetcher) Fetch(logger *slog.Logger, fp *gofeed.Parser, url string, username string, feedItems []*db.FeedItem) (*Feed, error) {
338 logger.Info("fetching feed", "url", url)
339
340 feed, err := f.ParseURL(fp, url)
341 if err != nil {
342 return nil, err
343 }
344
345 feedTmpl := &Feed{
346 Title: feed.Title,
347 Description: feed.Description,
348 Link: feed.Link,
349 }
350
351 items := []*FeedItemTmpl{}
352 gofeedItems := []*gofeed.Item{}
353 // we only want to return feed items published since the last digest time we fetched
354 for _, item := range feed.Items {
355 if item == nil {
356 continue
357 }
358
359 if !isValidItem(logger, item, feedItems) {
360 logger.Info("feed item already served", "guid", item.GUID)
361 continue
362 }
363
364 gofeedItems = append(gofeedItems, item)
365 items = append(items, itemToTemplate(item))
366 }
367
368 if len(items) == 0 {
369 return nil, fmt.Errorf(
370 "%s %w, skipping",
371 url,
372 ErrNoRecentArticles,
373 )
374 }
375
376 feedTmpl.FeedItems = gofeedItems
377 feedTmpl.Items = items
378 return feedTmpl, nil
379}
380
381func (f *Fetcher) PrintText(feedTmpl *DigestFeed) (string, error) {
382 ts, err := template.ParseFiles(
383 f.cfg.StaticPath("html/digest_text.page.tmpl"),
384 )
385
386 if err != nil {
387 return "", err
388 }
389
390 w := new(strings.Builder)
391 err = ts.Execute(w, feedTmpl)
392 if err != nil {
393 return "", err
394 }
395
396 return w.String(), nil
397}
398
399func (f *Fetcher) PrintHtml(feedTmpl *DigestFeed) (string, error) {
400 ts, err := html.ParseFiles(
401 f.cfg.StaticPath("html/digest.page.tmpl"),
402 )
403
404 if err != nil {
405 return "", err
406 }
407
408 w := new(strings.Builder)
409 err = ts.Execute(w, feedTmpl)
410 if err != nil {
411 return "", err
412 }
413
414 return w.String(), nil
415}
416
417type MsgBody struct {
418 Html string
419 Text string
420}
421
422func (f *Fetcher) FetchAll(logger *slog.Logger, urls []string, inlineContent bool, username string, post *db.Post) (*MsgBody, error) {
423 logger.Info("fetching feeds", "inlineContent", inlineContent)
424 fp := gofeed.NewParser()
425 daysLeft := ""
426 showBanner := false
427 if post.ExpiresAt != nil {
428 diff := time.Until(*post.ExpiresAt)
429 daysLeftInt := int(math.Ceil(diff.Hours() / 24))
430 daysLeft = fmt.Sprintf("%d", daysLeftInt)
431 if daysLeftInt <= 30 {
432 showBanner = true
433 }
434 }
435 feeds := &DigestFeed{
436 KeepAliveURL: fmt.Sprintf("https://feeds.pico.sh/keep-alive/%s", post.ID),
437 UnsubURL: fmt.Sprintf("https://feeds.pico.sh/unsub/%s", post.ID),
438 DaysLeft: daysLeft,
439 ShowBanner: showBanner,
440 Options: DigestOptions{InlineContent: inlineContent},
441 }
442 feedItems, err := f.db.FindFeedItemsByPostID(post.ID)
443 if err != nil {
444 return nil, err
445 }
446
447 if len(urls) == 0 {
448 return nil, fmt.Errorf("feed file does not contain any urls")
449 }
450
451 var allErrors error
452 for _, url := range urls {
453 feedTmpl, err := f.Fetch(logger, fp, url, username, feedItems)
454 if err != nil {
455 if errors.Is(err, ErrNoRecentArticles) {
456 logger.Info("no recent articles", "err", err)
457 } else {
458 allErrors = errors.Join(allErrors, fmt.Errorf("%s: %w", url, err))
459 logger.Error("fetch error", "err", err)
460 }
461 continue
462 }
463 feeds.Feeds = append(feeds.Feeds, feedTmpl)
464 }
465
466 if len(feeds.Feeds) == 0 {
467 if allErrors != nil {
468 return nil, allErrors
469 }
470 return nil, fmt.Errorf("%w, skipping email", ErrNoRecentArticles)
471 }
472
473 fdi := []*db.FeedItem{}
474 for _, feed := range feeds.Feeds {
475 for _, item := range feed.FeedItems {
476 uid := getFeedItemID(logger, item)
477 fdi = append(fdi, &db.FeedItem{
478 PostID: post.ID,
479 GUID: uid,
480 Data: db.FeedItemData{
481 Title: item.Title,
482 Description: item.Description,
483 Content: item.Content,
484 Link: item.Link,
485 PublishedAt: item.PublishedParsed,
486 },
487 })
488 }
489 }
490 err = f.db.InsertFeedItems(post.ID, fdi)
491 if err != nil {
492 return nil, err
493 }
494
495 text, err := f.PrintText(feeds)
496 if err != nil {
497 return nil, err
498 }
499
500 html, err := f.PrintHtml(feeds)
501 if err != nil {
502 return nil, err
503 }
504
505 if allErrors != nil {
506 text = fmt.Sprintf("> %s\n\n%s", allErrors, text)
507 html = fmt.Sprintf("<blockquote>%s</blockquote><br /><br/>%s", allErrors, html)
508 }
509
510 return &MsgBody{
511 Text: text,
512 Html: html,
513 }, nil
514}
515
516func (f *Fetcher) SendEmail(logger *slog.Logger, username, email string, subject string, msg *MsgBody) error {
517 if email == "" {
518 return fmt.Errorf("(%s) does not have an email associated with their feed post", username)
519 }
520
521 from := mail.NewEmail("team pico", shared.DefaultEmail)
522 to := mail.NewEmail(username, email)
523
524 // f.logger.Infof("message body (%s)", plainTextContent)
525
526 message := mail.NewSingleEmail(from, subject, to, msg.Text, msg.Html)
527 client := sendgrid.NewSendClient(f.cfg.SendgridKey)
528
529 logger.Info("sending email digest")
530 response, err := client.Send(message)
531 if err != nil {
532 return err
533 }
534
535 // f.logger.Infof("(%s) email digest response: %v", username, response)
536
537 if len(response.Headers["X-Message-Id"]) > 0 {
538 logger.Info(
539 "successfully sent email digest",
540 "email", email,
541 "x-message-id", response.Headers["X-Message-Id"][0],
542 )
543 } else {
544 logger.Error(
545 "could not find x-message-id, which means sending an email failed",
546 "email", email,
547 )
548 }
549
550 return nil
551}
552
553func (f *Fetcher) Run(logger *slog.Logger) error {
554 users, err := f.db.FindUsers()
555 if err != nil {
556 return err
557 }
558
559 for _, user := range users {
560 err := f.RunUser(user)
561 if err != nil {
562 logger.Error("run user failed", "err", err)
563 continue
564 }
565 }
566
567 return nil
568}
569
570func (f *Fetcher) Loop() {
571 logger := f.cfg.Logger
572 for {
573 logger.Info("running digest emailer")
574
575 err := f.Run(logger)
576 if err != nil {
577 logger.Error("run failed", "err", err)
578 }
579
580 logger.Info("digest emailer finished, waiting 10 mins")
581 time.Sleep(10 * time.Minute)
582 }
583}