repos / pico

pico services mono repo
git clone https://github.com/picosh/pico.git

pico / pkg / apps / feeds
Antonio Mika  ·  2025-03-12

cron.go

  1package feeds
  2
  3import (
  4	"crypto/tls"
  5	"errors"
  6	"fmt"
  7	html "html/template"
  8	"io"
  9	"log/slog"
 10	"math"
 11	"net/http"
 12	"net/url"
 13	"strings"
 14	"text/template"
 15	"time"
 16
 17	"github.com/mmcdole/gofeed"
 18	"github.com/picosh/pico/pkg/db"
 19	"github.com/picosh/pico/pkg/shared"
 20	"github.com/sendgrid/sendgrid-go"
 21	"github.com/sendgrid/sendgrid-go/helpers/mail"
 22)
 23
 24var ErrNoRecentArticles = errors.New("no recent articles")
 25
 26type UserAgentTransport struct {
 27	http.RoundTripper
 28}
 29
 30func (c *UserAgentTransport) RoundTrip(r *http.Request) (*http.Response, error) {
 31	userAgent := "linux:feeds:v2 (by /u/pico-sh)"
 32	r.Header.Set("User-Agent", userAgent)
 33	r.Header.Set("Accept", "*/*")
 34	return c.RoundTripper.RoundTrip(r)
 35}
 36
 37var httpClient = http.Client{
 38	Transport: &UserAgentTransport{
 39		&http.Transport{
 40			TLSClientConfig: &tls.Config{},
 41		},
 42	},
 43}
 44
 45type FeedItemTmpl struct {
 46	GUID        string
 47	Title       string
 48	Link        string
 49	PublishedAt *time.Time
 50	Content     html.HTML
 51	Description html.HTML
 52}
 53
 54type Feed struct {
 55	Title       string
 56	Link        string
 57	Description string
 58	Items       []*FeedItemTmpl
 59	FeedItems   []*gofeed.Item
 60}
 61
 62type DigestFeed struct {
 63	Feeds        []*Feed
 64	Options      DigestOptions
 65	KeepAliveURL string
 66	UnsubURL     string
 67	DaysLeft     string
 68	ShowBanner   bool
 69}
 70
 71type DigestOptions struct {
 72	InlineContent bool
 73}
 74
 75func itemToTemplate(item *gofeed.Item) *FeedItemTmpl {
 76	return &FeedItemTmpl{
 77		Title:       item.Title,
 78		Link:        item.Link,
 79		PublishedAt: item.PublishedParsed,
 80		Description: html.HTML(item.Description),
 81		Content:     html.HTML(item.Content),
 82	}
 83}
 84
 85func DigestOptionToTime(lastDigest time.Time, interval string) time.Time {
 86	day := 24 * time.Hour
 87	if interval == "10min" {
 88		return lastDigest.Add(10 * time.Minute)
 89	} else if interval == "1hour" {
 90		return lastDigest.Add(1 * time.Hour)
 91	} else if interval == "6hour" {
 92		return lastDigest.Add(6 * time.Hour)
 93	} else if interval == "12hour" {
 94		return lastDigest.Add(12 * time.Hour)
 95	} else if interval == "1day" || interval == "" {
 96		return lastDigest.Add(1 * day)
 97	} else if interval == "7day" {
 98		return lastDigest.Add(7 * day)
 99	} else if interval == "30day" {
100		return lastDigest.Add(30 * day)
101	} else {
102		return lastDigest
103	}
104}
105
106func getFeedItemID(logger *slog.Logger, item *gofeed.Item) string {
107	guid := item.GUID
108	if item.GUID == "" {
109		logger.Info("no <guid> found for feed item, using <link> instead for its unique id")
110		return item.Link
111	}
112	return guid
113}
114
115// see if this feed item should be emailed to user.
116func isValidItem(logger *slog.Logger, item *gofeed.Item, feedItems []*db.FeedItem) bool {
117	for _, feedItem := range feedItems {
118		if getFeedItemID(logger, item) == feedItem.GUID {
119			return false
120		}
121	}
122
123	return true
124}
125
126type Fetcher struct {
127	cfg *shared.ConfigSite
128	db  db.DB
129}
130
131func NewFetcher(dbpool db.DB, cfg *shared.ConfigSite) *Fetcher {
132	return &Fetcher{
133		db:  dbpool,
134		cfg: cfg,
135	}
136}
137
138func (f *Fetcher) Validate(post *db.Post, parsed *shared.ListParsedText) error {
139	lastDigest := post.Data.LastDigest
140	if lastDigest == nil {
141		return nil
142	}
143
144	now := time.Now().UTC()
145
146	expiresAt := post.ExpiresAt
147	if expiresAt != nil {
148		if post.ExpiresAt.Before(now) {
149			return fmt.Errorf("(%s) post has expired, skipping", post.ExpiresAt.Format(time.RFC3339))
150		}
151	}
152
153	digestAt := DigestOptionToTime(*lastDigest, parsed.DigestInterval)
154	if digestAt.After(now) {
155		return fmt.Errorf("(%s) not time to digest, skipping", digestAt.Format(time.RFC3339))
156	}
157	return nil
158}
159
160func (f *Fetcher) RunPost(logger *slog.Logger, user *db.User, post *db.Post, skipValidation bool) error {
161	logger = logger.With("filename", post.Filename)
162	logger.Info("running feed post")
163
164	parsed := shared.ListParseText(post.Text)
165
166	if parsed.Email == "" {
167		logger.Error("post does not have an email associated, removing post")
168		err := f.db.RemovePosts([]string{post.ID})
169		if err != nil {
170			return err
171		}
172	}
173
174	logger.Info("last digest at", "lastDigest", post.Data.LastDigest.Format(time.RFC3339))
175	err := f.Validate(post, parsed)
176	if err != nil {
177		logger.Info("validation failed", "err", err)
178		if skipValidation {
179			logger.Info("overriding validation error, continuing")
180		} else {
181			return nil
182		}
183	}
184
185	urls := []string{}
186	for _, item := range parsed.Items {
187		u := ""
188		if item.IsText || item.IsURL {
189			u = item.Value
190		} else if item.IsURL {
191			u = string(item.Value)
192		}
193
194		if u == "" {
195			continue
196		}
197
198		_, err := url.Parse(string(item.URL))
199		if err != nil {
200			logger.Info("invalid url", "url", string(item.URL))
201			continue
202		}
203
204		logger.Info("found rss feed url", "url", u)
205		urls = append(urls, u)
206	}
207
208	now := time.Now().UTC()
209	if post.ExpiresAt == nil {
210		expiresAt := time.Now().AddDate(0, 12, 0)
211		post.ExpiresAt = &expiresAt
212	}
213	_, err = f.db.UpdatePost(post)
214	if err != nil {
215		return err
216	}
217
218	subject := fmt.Sprintf("%s feed digest", post.Title)
219
220	msgBody, err := f.FetchAll(logger, urls, parsed.InlineContent, user.Name, post)
221	if err != nil {
222		errForUser := err
223
224		// we don't want to increment in this case
225		if errors.Is(errForUser, ErrNoRecentArticles) {
226			return nil
227		}
228
229		post.Data.Attempts += 1
230		logger.Error("could not fetch urls", "err", err, "attempts", post.Data.Attempts)
231
232		maxAttempts := 10
233		errBody := fmt.Sprintf(`There was an error attempting to fetch your feeds (%d) times.  After (%d) attempts we remove the file from our system.  Please check all the URLs and re-upload.
234Also, we have centralized logs in our pico.sh TUI that will display realtime feed errors so you can debug.
235
236
237%s
238
239
240%s`, post.Data.Attempts, maxAttempts, errForUser.Error(), post.Text)
241		err = f.SendEmail(
242			logger, user.Name,
243			parsed.Email,
244			subject,
245			&MsgBody{Html: strings.ReplaceAll(errBody, "\n", "<br />"), Text: errBody},
246		)
247		if err != nil {
248			return err
249		}
250
251		if post.Data.Attempts >= maxAttempts {
252			err = f.db.RemovePosts([]string{post.ID})
253			if err != nil {
254				return err
255			}
256		} else {
257			_, err = f.db.UpdatePost(post)
258			if err != nil {
259				return err
260			}
261		}
262		return errForUser
263	} else {
264		post.Data.Attempts = 0
265		_, err := f.db.UpdatePost(post)
266		if err != nil {
267			return err
268		}
269	}
270
271	if msgBody != nil {
272		err = f.SendEmail(logger, user.Name, parsed.Email, subject, msgBody)
273		if err != nil {
274			return err
275		}
276	}
277
278	post.Data.LastDigest = &now
279	_, err = f.db.UpdatePost(post)
280	if err != nil {
281		return err
282	}
283
284	return nil
285}
286
287func (f *Fetcher) RunUser(user *db.User) error {
288	logger := shared.LoggerWithUser(f.cfg.Logger, user)
289	posts, err := f.db.FindPostsForUser(&db.Pager{Num: 100}, user.ID, "feeds")
290	if err != nil {
291		return err
292	}
293
294	if len(posts.Data) > 0 {
295		logger.Info("found feed posts", "len", len(posts.Data))
296	}
297
298	for _, post := range posts.Data {
299		err = f.RunPost(logger, user, post, false)
300		if err != nil {
301			logger.Error("run post failed", "err", err)
302		}
303	}
304
305	return nil
306}
307
308func (f *Fetcher) ParseURL(fp *gofeed.Parser, url string) (*gofeed.Feed, error) {
309	req, err := http.NewRequest("GET", url, nil)
310	if err != nil {
311		return nil, err
312	}
313
314	resp, err := httpClient.Do(req)
315	if err != nil {
316		return nil, err
317	}
318
319	defer resp.Body.Close()
320	body, err := io.ReadAll(resp.Body)
321	if err != nil {
322		return nil, err
323	}
324
325	if resp.StatusCode < 200 || resp.StatusCode > 300 {
326		return nil, fmt.Errorf("fetching feed resulted in an error: %s %s", resp.Status, body)
327	}
328
329	feed, err := fp.ParseString(string(body))
330	if err != nil {
331		return nil, err
332	}
333
334	return feed, nil
335}
336
337func (f *Fetcher) Fetch(logger *slog.Logger, fp *gofeed.Parser, url string, username string, feedItems []*db.FeedItem) (*Feed, error) {
338	logger.Info("fetching feed", "url", url)
339
340	feed, err := f.ParseURL(fp, url)
341	if err != nil {
342		return nil, err
343	}
344
345	feedTmpl := &Feed{
346		Title:       feed.Title,
347		Description: feed.Description,
348		Link:        feed.Link,
349	}
350
351	items := []*FeedItemTmpl{}
352	gofeedItems := []*gofeed.Item{}
353	// we only want to return feed items published since the last digest time we fetched
354	for _, item := range feed.Items {
355		if item == nil {
356			continue
357		}
358
359		if !isValidItem(logger, item, feedItems) {
360			logger.Info("feed item already served", "guid", item.GUID)
361			continue
362		}
363
364		gofeedItems = append(gofeedItems, item)
365		items = append(items, itemToTemplate(item))
366	}
367
368	if len(items) == 0 {
369		return nil, fmt.Errorf(
370			"%s %w, skipping",
371			url,
372			ErrNoRecentArticles,
373		)
374	}
375
376	feedTmpl.FeedItems = gofeedItems
377	feedTmpl.Items = items
378	return feedTmpl, nil
379}
380
381func (f *Fetcher) PrintText(feedTmpl *DigestFeed) (string, error) {
382	ts, err := template.ParseFiles(
383		f.cfg.StaticPath("html/digest_text.page.tmpl"),
384	)
385
386	if err != nil {
387		return "", err
388	}
389
390	w := new(strings.Builder)
391	err = ts.Execute(w, feedTmpl)
392	if err != nil {
393		return "", err
394	}
395
396	return w.String(), nil
397}
398
399func (f *Fetcher) PrintHtml(feedTmpl *DigestFeed) (string, error) {
400	ts, err := html.ParseFiles(
401		f.cfg.StaticPath("html/digest.page.tmpl"),
402	)
403
404	if err != nil {
405		return "", err
406	}
407
408	w := new(strings.Builder)
409	err = ts.Execute(w, feedTmpl)
410	if err != nil {
411		return "", err
412	}
413
414	return w.String(), nil
415}
416
417type MsgBody struct {
418	Html string
419	Text string
420}
421
422func (f *Fetcher) FetchAll(logger *slog.Logger, urls []string, inlineContent bool, username string, post *db.Post) (*MsgBody, error) {
423	logger.Info("fetching feeds", "inlineContent", inlineContent)
424	fp := gofeed.NewParser()
425	daysLeft := ""
426	showBanner := false
427	if post.ExpiresAt != nil {
428		diff := time.Until(*post.ExpiresAt)
429		daysLeftInt := int(math.Ceil(diff.Hours() / 24))
430		daysLeft = fmt.Sprintf("%d", daysLeftInt)
431		if daysLeftInt <= 30 {
432			showBanner = true
433		}
434	}
435	feeds := &DigestFeed{
436		KeepAliveURL: fmt.Sprintf("https://feeds.pico.sh/keep-alive/%s", post.ID),
437		UnsubURL:     fmt.Sprintf("https://feeds.pico.sh/unsub/%s", post.ID),
438		DaysLeft:     daysLeft,
439		ShowBanner:   showBanner,
440		Options:      DigestOptions{InlineContent: inlineContent},
441	}
442	feedItems, err := f.db.FindFeedItemsByPostID(post.ID)
443	if err != nil {
444		return nil, err
445	}
446
447	if len(urls) == 0 {
448		return nil, fmt.Errorf("feed file does not contain any urls")
449	}
450
451	var allErrors error
452	for _, url := range urls {
453		feedTmpl, err := f.Fetch(logger, fp, url, username, feedItems)
454		if err != nil {
455			if errors.Is(err, ErrNoRecentArticles) {
456				logger.Info("no recent articles", "err", err)
457			} else {
458				allErrors = errors.Join(allErrors, fmt.Errorf("%s: %w", url, err))
459				logger.Error("fetch error", "err", err)
460			}
461			continue
462		}
463		feeds.Feeds = append(feeds.Feeds, feedTmpl)
464	}
465
466	if len(feeds.Feeds) == 0 {
467		if allErrors != nil {
468			return nil, allErrors
469		}
470		return nil, fmt.Errorf("%w, skipping email", ErrNoRecentArticles)
471	}
472
473	fdi := []*db.FeedItem{}
474	for _, feed := range feeds.Feeds {
475		for _, item := range feed.FeedItems {
476			uid := getFeedItemID(logger, item)
477			fdi = append(fdi, &db.FeedItem{
478				PostID: post.ID,
479				GUID:   uid,
480				Data: db.FeedItemData{
481					Title:       item.Title,
482					Description: item.Description,
483					Content:     item.Content,
484					Link:        item.Link,
485					PublishedAt: item.PublishedParsed,
486				},
487			})
488		}
489	}
490	err = f.db.InsertFeedItems(post.ID, fdi)
491	if err != nil {
492		return nil, err
493	}
494
495	text, err := f.PrintText(feeds)
496	if err != nil {
497		return nil, err
498	}
499
500	html, err := f.PrintHtml(feeds)
501	if err != nil {
502		return nil, err
503	}
504
505	if allErrors != nil {
506		text = fmt.Sprintf("> %s\n\n%s", allErrors, text)
507		html = fmt.Sprintf("<blockquote>%s</blockquote><br /><br/>%s", allErrors, html)
508	}
509
510	return &MsgBody{
511		Text: text,
512		Html: html,
513	}, nil
514}
515
516func (f *Fetcher) SendEmail(logger *slog.Logger, username, email string, subject string, msg *MsgBody) error {
517	if email == "" {
518		return fmt.Errorf("(%s) does not have an email associated with their feed post", username)
519	}
520
521	from := mail.NewEmail("team pico", shared.DefaultEmail)
522	to := mail.NewEmail(username, email)
523
524	// f.logger.Infof("message body (%s)", plainTextContent)
525
526	message := mail.NewSingleEmail(from, subject, to, msg.Text, msg.Html)
527	client := sendgrid.NewSendClient(f.cfg.SendgridKey)
528
529	logger.Info("sending email digest")
530	response, err := client.Send(message)
531	if err != nil {
532		return err
533	}
534
535	// f.logger.Infof("(%s) email digest response: %v", username, response)
536
537	if len(response.Headers["X-Message-Id"]) > 0 {
538		logger.Info(
539			"successfully sent email digest",
540			"email", email,
541			"x-message-id", response.Headers["X-Message-Id"][0],
542		)
543	} else {
544		logger.Error(
545			"could not find x-message-id, which means sending an email failed",
546			"email", email,
547		)
548	}
549
550	return nil
551}
552
553func (f *Fetcher) Run(logger *slog.Logger) error {
554	users, err := f.db.FindUsers()
555	if err != nil {
556		return err
557	}
558
559	for _, user := range users {
560		err := f.RunUser(user)
561		if err != nil {
562			logger.Error("run user failed", "err", err)
563			continue
564		}
565	}
566
567	return nil
568}
569
570func (f *Fetcher) Loop() {
571	logger := f.cfg.Logger
572	for {
573		logger.Info("running digest emailer")
574
575		err := f.Run(logger)
576		if err != nil {
577			logger.Error("run failed", "err", err)
578		}
579
580		logger.Info("digest emailer finished, waiting 10 mins")
581		time.Sleep(10 * time.Minute)
582	}
583}