repos / pico

pico services mono repo
git clone https://github.com/picosh/pico.git

pico / pkg / apps / feeds
Eric Bower  ·  2025-05-25

cron.go

  1package feeds
  2
  3import (
  4	"crypto/tls"
  5	"errors"
  6	"fmt"
  7	html "html/template"
  8	"io"
  9	"log/slog"
 10	"math"
 11	"net/http"
 12	"net/url"
 13	"strings"
 14	"text/template"
 15	"time"
 16
 17	"github.com/mmcdole/gofeed"
 18	"github.com/picosh/pico/pkg/db"
 19	"github.com/picosh/pico/pkg/shared"
 20	"github.com/sendgrid/sendgrid-go"
 21	"github.com/sendgrid/sendgrid-go/helpers/mail"
 22)
 23
 24var ErrNoRecentArticles = errors.New("no recent articles")
 25
 26type UserAgentTransport struct {
 27	http.RoundTripper
 28}
 29
 30func (c *UserAgentTransport) RoundTrip(r *http.Request) (*http.Response, error) {
 31	userAgent := "linux:feeds:v2 (by /u/pico-sh)"
 32	r.Header.Set("User-Agent", userAgent)
 33	r.Header.Set("Accept", "*/*")
 34	return c.RoundTripper.RoundTrip(r)
 35}
 36
 37var httpClient = http.Client{
 38	Transport: &UserAgentTransport{
 39		&http.Transport{
 40			TLSClientConfig: &tls.Config{},
 41		},
 42	},
 43}
 44
 45type FeedItemTmpl struct {
 46	GUID        string
 47	Title       string
 48	Link        string
 49	PublishedAt *time.Time
 50	Content     html.HTML
 51	Description html.HTML
 52}
 53
 54type Feed struct {
 55	Title       string
 56	Link        string
 57	Description string
 58	Items       []*FeedItemTmpl
 59	FeedItems   []*gofeed.Item
 60}
 61
 62type DigestFeed struct {
 63	Feeds        []*Feed
 64	Options      DigestOptions
 65	KeepAliveURL string
 66	UnsubURL     string
 67	DaysLeft     string
 68	ShowBanner   bool
 69}
 70
 71type DigestOptions struct {
 72	InlineContent bool
 73}
 74
 75func itemToTemplate(item *gofeed.Item) *FeedItemTmpl {
 76	return &FeedItemTmpl{
 77		Title:       item.Title,
 78		Link:        item.Link,
 79		PublishedAt: item.PublishedParsed,
 80		Description: html.HTML(item.Description),
 81		Content:     html.HTML(item.Content),
 82	}
 83}
 84
 85func DigestOptionToTime(lastDigest time.Time, interval string) time.Time {
 86	day := 24 * time.Hour
 87	switch interval {
 88	case "10min":
 89		return lastDigest.Add(10 * time.Minute)
 90	case "1hour":
 91		return lastDigest.Add(1 * time.Hour)
 92	case "6hour":
 93		return lastDigest.Add(6 * time.Hour)
 94	case "12hour":
 95		return lastDigest.Add(12 * time.Hour)
 96	case "1day", "":
 97		return lastDigest.Add(1 * day)
 98	case "7day":
 99		return lastDigest.Add(7 * day)
100	case "30day":
101		return lastDigest.Add(30 * day)
102	default:
103		return lastDigest
104	}
105}
106
107func getFeedItemID(logger *slog.Logger, item *gofeed.Item) string {
108	guid := item.GUID
109	if item.GUID == "" {
110		logger.Info("no <guid> found for feed item, using <link> instead for its unique id")
111		return item.Link
112	}
113	return guid
114}
115
116// see if this feed item should be emailed to user.
117func isValidItem(logger *slog.Logger, item *gofeed.Item, feedItems []*db.FeedItem) bool {
118	for _, feedItem := range feedItems {
119		if getFeedItemID(logger, item) == feedItem.GUID {
120			return false
121		}
122	}
123
124	return true
125}
126
127type Fetcher struct {
128	cfg *shared.ConfigSite
129	db  db.DB
130}
131
132func NewFetcher(dbpool db.DB, cfg *shared.ConfigSite) *Fetcher {
133	return &Fetcher{
134		db:  dbpool,
135		cfg: cfg,
136	}
137}
138
139func (f *Fetcher) Validate(post *db.Post, parsed *shared.ListParsedText) error {
140	lastDigest := post.Data.LastDigest
141	if lastDigest == nil {
142		return nil
143	}
144
145	now := time.Now().UTC()
146
147	expiresAt := post.ExpiresAt
148	if expiresAt != nil {
149		if post.ExpiresAt.Before(now) {
150			return fmt.Errorf("(%s) post has expired, skipping", post.ExpiresAt.Format(time.RFC3339))
151		}
152	}
153
154	digestAt := DigestOptionToTime(*lastDigest, parsed.DigestInterval)
155	if digestAt.After(now) {
156		return fmt.Errorf("(%s) not time to digest, skipping", digestAt.Format(time.RFC3339))
157	}
158	return nil
159}
160
161func (f *Fetcher) RunPost(logger *slog.Logger, user *db.User, post *db.Post, skipValidation bool) error {
162	logger = logger.With("filename", post.Filename)
163	logger.Info("running feed post")
164
165	parsed := shared.ListParseText(post.Text)
166
167	if parsed.Email == "" {
168		logger.Error("post does not have an email associated, removing post")
169		err := f.db.RemovePosts([]string{post.ID})
170		if err != nil {
171			return err
172		}
173	}
174
175	logger.Info("last digest at", "lastDigest", post.Data.LastDigest.Format(time.RFC3339))
176	err := f.Validate(post, parsed)
177	if err != nil {
178		logger.Info("validation failed", "err", err)
179		if skipValidation {
180			logger.Info("overriding validation error, continuing")
181		} else {
182			return nil
183		}
184	}
185
186	urls := []string{}
187	for _, item := range parsed.Items {
188		u := ""
189		if item.IsText || item.IsURL {
190			u = item.Value
191		} else if item.IsURL {
192			u = string(item.Value)
193		}
194
195		if u == "" {
196			continue
197		}
198
199		_, err := url.Parse(string(item.URL))
200		if err != nil {
201			logger.Info("invalid url", "url", string(item.URL))
202			continue
203		}
204
205		logger.Info("found rss feed url", "url", u)
206		urls = append(urls, u)
207	}
208
209	now := time.Now().UTC()
210	if post.ExpiresAt == nil {
211		expiresAt := time.Now().AddDate(0, 12, 0)
212		post.ExpiresAt = &expiresAt
213	}
214	_, err = f.db.UpdatePost(post)
215	if err != nil {
216		return err
217	}
218
219	subject := fmt.Sprintf("%s feed digest", post.Title)
220
221	msgBody, err := f.FetchAll(logger, urls, parsed.InlineContent, user.Name, post)
222	if err != nil {
223		errForUser := err
224
225		// we don't want to increment in this case
226		if errors.Is(errForUser, ErrNoRecentArticles) {
227			return nil
228		}
229
230		post.Data.Attempts += 1
231		logger.Error("could not fetch urls", "err", err, "attempts", post.Data.Attempts)
232
233		maxAttempts := 10
234		errBody := fmt.Sprintf(`There was an error attempting to fetch your feeds (%d) times.  After (%d) attempts we remove the file from our system.  Please check all the URLs and re-upload.
235Also, we have centralized logs in our pico.sh TUI that will display realtime feed errors so you can debug.
236
237
238%s
239
240
241%s`, post.Data.Attempts, maxAttempts, errForUser.Error(), post.Text)
242		err = f.SendEmail(
243			logger, user.Name,
244			parsed.Email,
245			subject,
246			&MsgBody{Html: strings.ReplaceAll(errBody, "\n", "<br />"), Text: errBody},
247		)
248		if err != nil {
249			return err
250		}
251
252		if post.Data.Attempts >= maxAttempts {
253			err = f.db.RemovePosts([]string{post.ID})
254			if err != nil {
255				return err
256			}
257		} else {
258			_, err = f.db.UpdatePost(post)
259			if err != nil {
260				return err
261			}
262		}
263		return errForUser
264	} else {
265		post.Data.Attempts = 0
266		_, err := f.db.UpdatePost(post)
267		if err != nil {
268			return err
269		}
270	}
271
272	if msgBody != nil {
273		err = f.SendEmail(logger, user.Name, parsed.Email, subject, msgBody)
274		if err != nil {
275			return err
276		}
277	}
278
279	post.Data.LastDigest = &now
280	_, err = f.db.UpdatePost(post)
281	if err != nil {
282		return err
283	}
284
285	return nil
286}
287
288func (f *Fetcher) RunUser(user *db.User) error {
289	logger := shared.LoggerWithUser(f.cfg.Logger, user)
290	posts, err := f.db.FindPostsForUser(&db.Pager{Num: 100}, user.ID, "feeds")
291	if err != nil {
292		return err
293	}
294
295	if len(posts.Data) > 0 {
296		logger.Info("found feed posts", "len", len(posts.Data))
297	}
298
299	for _, post := range posts.Data {
300		err = f.RunPost(logger, user, post, false)
301		if err != nil {
302			logger.Error("run post failed", "err", err)
303		}
304	}
305
306	return nil
307}
308
309func (f *Fetcher) ParseURL(fp *gofeed.Parser, url string) (*gofeed.Feed, error) {
310	req, err := http.NewRequest("GET", url, nil)
311	if err != nil {
312		return nil, err
313	}
314
315	resp, err := httpClient.Do(req)
316	if err != nil {
317		return nil, err
318	}
319
320	defer func() {
321		_ = resp.Body.Close()
322	}()
323	body, err := io.ReadAll(resp.Body)
324	if err != nil {
325		return nil, err
326	}
327
328	if resp.StatusCode < 200 || resp.StatusCode > 300 {
329		return nil, fmt.Errorf("fetching feed resulted in an error: %s %s", resp.Status, body)
330	}
331
332	feed, err := fp.ParseString(string(body))
333	if err != nil {
334		return nil, err
335	}
336
337	return feed, nil
338}
339
340func (f *Fetcher) Fetch(logger *slog.Logger, fp *gofeed.Parser, url string, username string, feedItems []*db.FeedItem) (*Feed, error) {
341	logger.Info("fetching feed", "url", url)
342
343	feed, err := f.ParseURL(fp, url)
344	if err != nil {
345		return nil, err
346	}
347
348	feedTmpl := &Feed{
349		Title:       feed.Title,
350		Description: feed.Description,
351		Link:        feed.Link,
352	}
353
354	items := []*FeedItemTmpl{}
355	gofeedItems := []*gofeed.Item{}
356	// we only want to return feed items published since the last digest time we fetched
357	for _, item := range feed.Items {
358		if item == nil {
359			continue
360		}
361
362		if !isValidItem(logger, item, feedItems) {
363			logger.Info("feed item already served", "guid", item.GUID)
364			continue
365		}
366
367		gofeedItems = append(gofeedItems, item)
368		items = append(items, itemToTemplate(item))
369	}
370
371	if len(items) == 0 {
372		return nil, fmt.Errorf(
373			"%s %w, skipping",
374			url,
375			ErrNoRecentArticles,
376		)
377	}
378
379	feedTmpl.FeedItems = gofeedItems
380	feedTmpl.Items = items
381	return feedTmpl, nil
382}
383
384func (f *Fetcher) PrintText(feedTmpl *DigestFeed) (string, error) {
385	ts, err := template.ParseFiles(
386		f.cfg.StaticPath("html/digest_text.page.tmpl"),
387	)
388
389	if err != nil {
390		return "", err
391	}
392
393	w := new(strings.Builder)
394	err = ts.Execute(w, feedTmpl)
395	if err != nil {
396		return "", err
397	}
398
399	return w.String(), nil
400}
401
402func (f *Fetcher) PrintHtml(feedTmpl *DigestFeed) (string, error) {
403	ts, err := html.ParseFiles(
404		f.cfg.StaticPath("html/digest.page.tmpl"),
405	)
406
407	if err != nil {
408		return "", err
409	}
410
411	w := new(strings.Builder)
412	err = ts.Execute(w, feedTmpl)
413	if err != nil {
414		return "", err
415	}
416
417	return w.String(), nil
418}
419
420type MsgBody struct {
421	Html string
422	Text string
423}
424
425func (f *Fetcher) FetchAll(logger *slog.Logger, urls []string, inlineContent bool, username string, post *db.Post) (*MsgBody, error) {
426	logger.Info("fetching feeds", "inlineContent", inlineContent)
427	fp := gofeed.NewParser()
428	daysLeft := ""
429	showBanner := false
430	if post.ExpiresAt != nil {
431		diff := time.Until(*post.ExpiresAt)
432		daysLeftInt := int(math.Ceil(diff.Hours() / 24))
433		daysLeft = fmt.Sprintf("%d", daysLeftInt)
434		if daysLeftInt <= 30 {
435			showBanner = true
436		}
437	}
438	feeds := &DigestFeed{
439		KeepAliveURL: fmt.Sprintf("https://feeds.pico.sh/keep-alive/%s", post.ID),
440		UnsubURL:     fmt.Sprintf("https://feeds.pico.sh/unsub/%s", post.ID),
441		DaysLeft:     daysLeft,
442		ShowBanner:   showBanner,
443		Options:      DigestOptions{InlineContent: inlineContent},
444	}
445	feedItems, err := f.db.FindFeedItemsByPostID(post.ID)
446	if err != nil {
447		return nil, err
448	}
449
450	if len(urls) == 0 {
451		return nil, fmt.Errorf("feed file does not contain any urls")
452	}
453
454	var allErrors error
455	for _, url := range urls {
456		feedTmpl, err := f.Fetch(logger, fp, url, username, feedItems)
457		if err != nil {
458			if errors.Is(err, ErrNoRecentArticles) {
459				logger.Info("no recent articles", "err", err)
460			} else {
461				allErrors = errors.Join(allErrors, fmt.Errorf("%s: %w", url, err))
462				logger.Error("fetch error", "err", err)
463			}
464			continue
465		}
466		feeds.Feeds = append(feeds.Feeds, feedTmpl)
467	}
468
469	if len(feeds.Feeds) == 0 {
470		if allErrors != nil {
471			return nil, allErrors
472		}
473		return nil, fmt.Errorf("%w, skipping email", ErrNoRecentArticles)
474	}
475
476	fdi := []*db.FeedItem{}
477	for _, feed := range feeds.Feeds {
478		for _, item := range feed.FeedItems {
479			uid := getFeedItemID(logger, item)
480			fdi = append(fdi, &db.FeedItem{
481				PostID: post.ID,
482				GUID:   uid,
483				Data: db.FeedItemData{
484					Title:       item.Title,
485					Description: item.Description,
486					Content:     item.Content,
487					Link:        item.Link,
488					PublishedAt: item.PublishedParsed,
489				},
490			})
491		}
492	}
493	err = f.db.InsertFeedItems(post.ID, fdi)
494	if err != nil {
495		return nil, err
496	}
497
498	text, err := f.PrintText(feeds)
499	if err != nil {
500		return nil, err
501	}
502
503	html, err := f.PrintHtml(feeds)
504	if err != nil {
505		return nil, err
506	}
507
508	if allErrors != nil {
509		text = fmt.Sprintf("> %s\n\n%s", allErrors, text)
510		html = fmt.Sprintf("<blockquote>%s</blockquote><br /><br/>%s", allErrors, html)
511	}
512
513	return &MsgBody{
514		Text: text,
515		Html: html,
516	}, nil
517}
518
519func (f *Fetcher) SendEmail(logger *slog.Logger, username, email string, subject string, msg *MsgBody) error {
520	if email == "" {
521		return fmt.Errorf("(%s) does not have an email associated with their feed post", username)
522	}
523
524	from := mail.NewEmail("team pico", shared.DefaultEmail)
525	to := mail.NewEmail(username, email)
526
527	// f.logger.Infof("message body (%s)", plainTextContent)
528
529	message := mail.NewSingleEmail(from, subject, to, msg.Text, msg.Html)
530	client := sendgrid.NewSendClient(f.cfg.SendgridKey)
531
532	logger.Info("sending email digest")
533	response, err := client.Send(message)
534	if err != nil {
535		return err
536	}
537
538	// f.logger.Infof("(%s) email digest response: %v", username, response)
539
540	if len(response.Headers["X-Message-Id"]) > 0 {
541		logger.Info(
542			"successfully sent email digest",
543			"email", email,
544			"x-message-id", response.Headers["X-Message-Id"][0],
545		)
546	} else {
547		logger.Error(
548			"could not find x-message-id, which means sending an email failed",
549			"email", email,
550		)
551	}
552
553	return nil
554}
555
556func (f *Fetcher) Run(logger *slog.Logger) error {
557	users, err := f.db.FindUsers()
558	if err != nil {
559		return err
560	}
561
562	for _, user := range users {
563		err := f.RunUser(user)
564		if err != nil {
565			logger.Error("run user failed", "err", err)
566			continue
567		}
568	}
569
570	return nil
571}
572
573func (f *Fetcher) Loop() {
574	logger := f.cfg.Logger
575	for {
576		logger.Info("running digest emailer")
577
578		err := f.Run(logger)
579		if err != nil {
580			logger.Error("run failed", "err", err)
581		}
582
583		logger.Info("digest emailer finished, waiting 10 mins")
584		time.Sleep(10 * time.Minute)
585	}
586}