repos / pico

pico services mono repo
git clone https://github.com/picosh/pico.git

pico / pkg / apps / feeds
Eric Bower  ·  2025-08-30

cron.go

  1package feeds
  2
  3import (
  4	"crypto/tls"
  5	"errors"
  6	"fmt"
  7	html "html/template"
  8	"io"
  9	"log/slog"
 10	"math"
 11	"net/http"
 12	"net/url"
 13	"os"
 14	"strings"
 15	"text/template"
 16	"time"
 17
 18	"github.com/adhocore/gronx"
 19	"github.com/emersion/go-sasl"
 20	"github.com/emersion/go-smtp"
 21	"github.com/mmcdole/gofeed"
 22	"github.com/picosh/pico/pkg/db"
 23	"github.com/picosh/pico/pkg/shared"
 24	"github.com/picosh/utils"
 25)
 26
 27var ErrNoRecentArticles = errors.New("no recent articles")
 28
 29type UserAgentTransport struct {
 30	http.RoundTripper
 31}
 32
 33func (c *UserAgentTransport) RoundTrip(r *http.Request) (*http.Response, error) {
 34	userAgent := "linux:feeds:v2 (by /u/pico-sh)"
 35	r.Header.Set("User-Agent", userAgent)
 36	r.Header.Set("Accept", "*/*")
 37	return c.RoundTripper.RoundTrip(r)
 38}
 39
 40var httpClient = http.Client{
 41	Transport: &UserAgentTransport{
 42		&http.Transport{
 43			TLSClientConfig: &tls.Config{},
 44		},
 45	},
 46}
 47
 48type FeedItemTmpl struct {
 49	GUID        string
 50	Title       string
 51	Link        string
 52	PublishedAt *time.Time
 53	Content     html.HTML
 54	Description html.HTML
 55}
 56
 57type Feed struct {
 58	Title       string
 59	Link        string
 60	Description string
 61	Items       []*FeedItemTmpl
 62	FeedItems   []*gofeed.Item
 63}
 64
 65type DigestFeed struct {
 66	Feeds        []*Feed
 67	Options      DigestOptions
 68	KeepAliveURL string
 69	UnsubURL     string
 70	DaysLeft     string
 71	ShowBanner   bool
 72	SizeWarning  bool
 73}
 74
 75type DigestOptions struct {
 76	InlineContent bool
 77}
 78
 79func itemToTemplate(item *gofeed.Item) *FeedItemTmpl {
 80	return &FeedItemTmpl{
 81		Title:       item.Title,
 82		Link:        item.Link,
 83		PublishedAt: item.PublishedParsed,
 84		Description: html.HTML(item.Description),
 85		Content:     html.HTML(item.Content),
 86	}
 87}
 88
 89func DigestIntervalToCron(interval string) string {
 90	switch interval {
 91	case "10min":
 92		return "*/10 * * * *"
 93	case "1hour":
 94		return "0 * * * *"
 95	case "6hour":
 96		return "0 */6 * * *"
 97	case "12hour":
 98		return "0 */12 * * *"
 99	case "1day", "":
100		return "0 13 * * *"
101	case "7day":
102		return "0 13 * * 0"
103	case "30day":
104		return "0 13 1 * *"
105	default:
106		return "0 13 * * *"
107	}
108}
109
110func getFeedItemID(logger *slog.Logger, item *gofeed.Item) string {
111	guid := strings.ToValidUTF8(item.GUID, "")
112	if item.GUID == "" {
113		logger.Info("no <guid> found for feed item, using <link> instead for its unique id")
114		return strings.ToValidUTF8(item.Link, "")
115	}
116	return guid
117}
118
119// see if this feed item should be emailed to user.
120func isValidItem(logger *slog.Logger, item *gofeed.Item, feedItems []*db.FeedItem) bool {
121	for _, feedItem := range feedItems {
122		if getFeedItemID(logger, item) == feedItem.GUID {
123			return false
124		}
125	}
126
127	return true
128}
129
130type Fetcher struct {
131	cfg  *shared.ConfigSite
132	db   db.DB
133	auth sasl.Client
134	gron *gronx.Gronx
135	host string
136}
137
138func NewFetcher(dbpool db.DB, cfg *shared.ConfigSite) *Fetcher {
139	host := os.Getenv("PICO_SMTP_HOST")
140	smtPass := os.Getenv("PICO_SMTP_PASS")
141	emailLogin := os.Getenv("PICO_SMTP_USER")
142
143	auth := sasl.NewPlainClient("", emailLogin, smtPass)
144	gron := gronx.New()
145	return &Fetcher{
146		db:   dbpool,
147		cfg:  cfg,
148		auth: auth,
149		gron: gron,
150		host: host,
151	}
152}
153
154func DateToMin(now time.Time) time.Time {
155	return time.Date(
156		now.Year(), now.Month(), now.Day(),
157		now.Hour(), now.Minute(),
158		0, 0, // zero out second and nano-second for cron
159		now.Location(),
160	)
161}
162
163func (f *Fetcher) Validate(logger *slog.Logger, post *db.Post, parsed *shared.ListParsedText, now time.Time) error {
164	expiresAt := post.ExpiresAt
165	if expiresAt != nil {
166		if post.ExpiresAt.Before(now) {
167			return fmt.Errorf("(%s) post has expired, skipping", post.ExpiresAt.Format(time.RFC3339))
168		}
169	}
170
171	cron := parsed.Cron
172	// support for posts with deprecated `digest_interval` property
173	if parsed.DigestInterval != "" {
174		cron = DigestIntervalToCron(parsed.DigestInterval)
175	}
176	logger.Info("found cron", "cron", cron)
177
178	if !f.gron.IsValid(cron) {
179		return fmt.Errorf("(%s) is invalid `cron`, skipping", cron)
180	}
181
182	dt := DateToMin(now)
183	isDue, err := f.gron.IsDue(cron, dt)
184	if err != nil {
185		return fmt.Errorf("cron error, skipping; err: %w", err)
186	}
187	if !isDue {
188		nextTime, _ := gronx.NextTickAfter(cron, dt, true)
189		return fmt.Errorf(
190			"cron not time to digest, skipping; cur run: %s, next run: %s",
191			f.gron.C.GetRef(),
192			nextTime,
193		)
194	}
195	return nil
196}
197
198func (f *Fetcher) RunPost(logger *slog.Logger, user *db.User, post *db.Post, skipValidation bool, now time.Time) error {
199	logger = logger.With("filename", post.Filename)
200	logger.Info("running feed post")
201
202	parsed := shared.ListParseText(post.Text)
203
204	if parsed.Email == "" {
205		logger.Error("post does not have an email associated, removing post")
206		err := f.db.RemovePosts([]string{post.ID})
207		if err != nil {
208			return err
209		}
210	}
211
212	if post.Data.LastDigest != nil {
213		logger.Info("last digest", "at", post.Data.LastDigest.Format(time.RFC3339))
214	} else {
215		logger.Info("first time running post")
216	}
217	err := f.Validate(logger, post, parsed, now)
218	if err != nil {
219		logger.Info("validation failed", "err", err)
220		if skipValidation {
221			logger.Info("overriding validation error, continuing")
222		} else {
223			return nil
224		}
225	}
226	logger.Info("validation success")
227
228	urls := []string{}
229	for _, item := range parsed.Items {
230		u := ""
231		if item.IsText || item.IsURL {
232			u = item.Value
233		} else if item.IsURL {
234			u = string(item.Value)
235		}
236
237		if u == "" {
238			continue
239		}
240
241		_, err := url.Parse(string(item.URL))
242		if err != nil {
243			logger.Info("invalid url", "url", string(item.URL))
244			continue
245		}
246
247		logger.Info("found rss feed url", "url", u)
248		urls = append(urls, u)
249	}
250
251	if post.ExpiresAt == nil {
252		expiresAt := now.AddDate(0, 12, 0)
253		post.ExpiresAt = &expiresAt
254	}
255	_, err = f.db.UpdatePost(post)
256	if err != nil {
257		return err
258	}
259
260	subject := fmt.Sprintf("%s feed digest", post.Filename)
261	unsubURL := getUnsubURL(post)
262
263	msgBody, err := f.FetchAll(logger, urls, parsed.InlineContent, user.Name, post)
264	if err != nil {
265		errForUser := err
266
267		// we don't want to increment in this case
268		if errors.Is(errForUser, ErrNoRecentArticles) {
269			return nil
270		}
271
272		post.Data.Attempts += 1
273		logger.Error("could not fetch urls", "err", err, "attempts", post.Data.Attempts)
274
275		maxAttempts := 10
276		errBody := fmt.Sprintf(`There was an error attempting to fetch your feeds (%d) times.  After (%d) attempts we remove the file from our system.  Please check all the URLs and re-upload.
277Also, we have centralized logs in our pico.sh TUI that will display realtime feed errors so you can debug.
278
279
280%s
281
282
283%s`, post.Data.Attempts, maxAttempts, errForUser.Error(), post.Text)
284		err = f.SendEmail(
285			logger,
286			user.Name,
287			parsed.Email,
288			subject,
289			unsubURL,
290			&MsgBody{Html: strings.ReplaceAll(errBody, "\n", "<br />"), Text: errBody},
291		)
292		if err != nil {
293			return err
294		}
295
296		if post.Data.Attempts >= maxAttempts {
297			err = f.db.RemovePosts([]string{post.ID})
298			if err != nil {
299				return err
300			}
301		} else {
302			_, err = f.db.UpdatePost(post)
303			if err != nil {
304				return err
305			}
306		}
307		return errForUser
308	} else {
309		post.Data.Attempts = 0
310		_, err := f.db.UpdatePost(post)
311		if err != nil {
312			return err
313		}
314	}
315
316	if msgBody != nil {
317		err = f.SendEmail(logger, user.Name, parsed.Email, subject, unsubURL, msgBody)
318		if err != nil {
319			return err
320		}
321	}
322
323	post.Data.LastDigest = &now
324	_, err = f.db.UpdatePost(post)
325	if err != nil {
326		return err
327	}
328
329	return nil
330}
331
332func (f *Fetcher) RunUser(user *db.User, now time.Time) error {
333	logger := shared.LoggerWithUser(f.cfg.Logger, user)
334	logger.Info("run user")
335	posts, err := f.db.FindPostsForUser(&db.Pager{Num: 100}, user.ID, "feeds")
336	if err != nil {
337		return err
338	}
339
340	if len(posts.Data) > 0 {
341		logger.Info("found feed posts", "len", len(posts.Data))
342	}
343
344	for _, post := range posts.Data {
345		err = f.RunPost(logger, user, post, false, now)
346		if err != nil {
347			logger.Error("run post failed", "err", err)
348		}
349	}
350
351	return nil
352}
353
354func (f *Fetcher) ParseURL(fp *gofeed.Parser, url string) (*gofeed.Feed, error) {
355	req, err := http.NewRequest("GET", url, nil)
356	if err != nil {
357		return nil, err
358	}
359
360	resp, err := httpClient.Do(req)
361	if err != nil {
362		return nil, err
363	}
364
365	defer func() {
366		_ = resp.Body.Close()
367	}()
368	body, err := io.ReadAll(resp.Body)
369	if err != nil {
370		return nil, err
371	}
372
373	if resp.StatusCode < 200 || resp.StatusCode > 300 {
374		return nil, fmt.Errorf("fetching feed resulted in an error: %s %s", resp.Status, body)
375	}
376
377	feed, err := fp.ParseString(string(body))
378	if err != nil {
379		return nil, err
380	}
381
382	return feed, nil
383}
384
385func (f *Fetcher) Fetch(logger *slog.Logger, fp *gofeed.Parser, url string, username string, feedItems []*db.FeedItem) (*Feed, error) {
386	logger.Info("fetching feed", "url", url)
387
388	feed, err := f.ParseURL(fp, url)
389	if err != nil {
390		return nil, err
391	}
392
393	feedTmpl := &Feed{
394		Title:       feed.Title,
395		Description: feed.Description,
396		Link:        feed.Link,
397	}
398
399	items := []*FeedItemTmpl{}
400	gofeedItems := []*gofeed.Item{}
401	// we only want to return feed items published since the last digest time we fetched
402	for _, item := range feed.Items {
403		if item == nil {
404			continue
405		}
406
407		if !isValidItem(logger, item, feedItems) {
408			logger.Info("feed item already served", "guid", item.GUID)
409			continue
410		}
411
412		gofeedItems = append(gofeedItems, item)
413		items = append(items, itemToTemplate(item))
414	}
415
416	if len(items) == 0 {
417		return nil, fmt.Errorf(
418			"%s %w, skipping",
419			url,
420			ErrNoRecentArticles,
421		)
422	}
423
424	feedTmpl.FeedItems = gofeedItems
425	feedTmpl.Items = items
426	return feedTmpl, nil
427}
428
429func (f *Fetcher) PrintText(feedTmpl *DigestFeed) (string, error) {
430	ts, err := template.ParseFiles(
431		f.cfg.StaticPath("html/digest_text.page.tmpl"),
432	)
433
434	if err != nil {
435		return "", err
436	}
437
438	w := new(strings.Builder)
439	err = ts.Execute(w, feedTmpl)
440	if err != nil {
441		return "", err
442	}
443
444	return w.String(), nil
445}
446
447func (f *Fetcher) PrintHtml(feedTmpl *DigestFeed) (string, error) {
448	ts, err := html.ParseFiles(
449		f.cfg.StaticPath("html/digest.page.tmpl"),
450	)
451
452	if err != nil {
453		return "", err
454	}
455
456	w := new(strings.Builder)
457	err = ts.Execute(w, feedTmpl)
458	if err != nil {
459		return "", err
460	}
461
462	return w.String(), nil
463}
464
465type MsgBody struct {
466	Html string
467	Text string
468}
469
470func getUnsubURL(post *db.Post) string {
471	return fmt.Sprintf("https://feeds.pico.sh/unsub/%s", post.ID)
472}
473
474func (f *Fetcher) FetchAll(logger *slog.Logger, urls []string, inlineContent bool, username string, post *db.Post) (*MsgBody, error) {
475	logger.Info("fetching feeds", "inlineContent", inlineContent)
476	fp := gofeed.NewParser()
477	daysLeft := ""
478	showBanner := false
479	if post.ExpiresAt != nil {
480		diff := time.Until(*post.ExpiresAt)
481		daysLeftInt := int(math.Ceil(diff.Hours() / 24))
482		daysLeft = fmt.Sprintf("%d", daysLeftInt)
483		if daysLeftInt <= 30 {
484			showBanner = true
485		}
486	}
487	feeds := &DigestFeed{
488		KeepAliveURL: fmt.Sprintf("https://feeds.pico.sh/keep-alive/%s", post.ID),
489		UnsubURL:     getUnsubURL(post),
490		DaysLeft:     daysLeft,
491		ShowBanner:   showBanner,
492		Options:      DigestOptions{InlineContent: inlineContent},
493	}
494	feedItems, err := f.db.FindFeedItemsByPostID(post.ID)
495	if err != nil {
496		return nil, err
497	}
498
499	if len(urls) == 0 {
500		return nil, fmt.Errorf("feed file does not contain any urls")
501	}
502
503	var allErrors error
504	for _, url := range urls {
505		feedTmpl, err := f.Fetch(logger, fp, url, username, feedItems)
506		if err != nil {
507			if errors.Is(err, ErrNoRecentArticles) {
508				logger.Info("no recent articles", "err", err)
509			} else {
510				allErrors = errors.Join(allErrors, fmt.Errorf("%s: %w", url, err))
511				logger.Error("fetch error", "err", err)
512			}
513			continue
514		}
515		feeds.Feeds = append(feeds.Feeds, feedTmpl)
516	}
517
518	if len(feeds.Feeds) == 0 {
519		if allErrors != nil {
520			return nil, allErrors
521		}
522		return nil, fmt.Errorf("%w, skipping email", ErrNoRecentArticles)
523	}
524
525	fdi := []*db.FeedItem{}
526	for _, feed := range feeds.Feeds {
527		for _, item := range feed.FeedItems {
528			uid := getFeedItemID(logger, item)
529			fdi = append(fdi, &db.FeedItem{
530				PostID: post.ID,
531				GUID:   uid,
532				Data: db.FeedItemData{
533					Title:       strings.ToValidUTF8(item.Title, ""),
534					Description: strings.ToValidUTF8(item.Description, ""),
535					Content:     strings.ToValidUTF8(item.Content, ""),
536					Link:        item.Link,
537					PublishedAt: item.PublishedParsed,
538				},
539			})
540		}
541	}
542	err = f.db.InsertFeedItems(post.ID, fdi)
543	if err != nil {
544		return nil, err
545	}
546
547	text, err := f.PrintText(feeds)
548	if err != nil {
549		return nil, err
550	}
551
552	html, err := f.PrintHtml(feeds)
553	if err != nil {
554		return nil, err
555	}
556
557	// cap body size to prevent abuse
558	if len(html)+len(text) > 5*utils.MB {
559		feeds.Options.InlineContent = false
560		feeds.SizeWarning = true
561		html, err = f.PrintHtml(feeds)
562		if err != nil {
563			return nil, err
564		}
565	}
566
567	if allErrors != nil {
568		text = fmt.Sprintf("> %s\n\n%s", allErrors, text)
569		html = fmt.Sprintf("<blockquote>%s</blockquote><br /><br/>%s", allErrors, html)
570	}
571
572	return &MsgBody{
573		Text: text,
574		Html: html,
575	}, nil
576}
577
578func (f *Fetcher) SendEmail(logger *slog.Logger, username, email, subject, unsubURL string, msg *MsgBody) error {
579	if email == "" {
580		return fmt.Errorf("(%s) does not have an email associated with their feed post", username)
581	}
582	smtpAddr := f.host
583	fromEmail := "hello@pico.sh"
584	to := []string{email}
585	headers := map[string]string{
586		"From":             fromEmail,
587		"Subject":          subject,
588		"To":               email,
589		"MIME-Version":     "1.0",
590		"Content-Type":     `multipart/alternative; boundary="boundary123"`,
591		"List-Unsubscribe": "<" + unsubURL + ">",
592	}
593	var content strings.Builder
594	for k, v := range headers {
595		content.WriteString(fmt.Sprintf("%s: %s\r\n", k, v))
596	}
597	content.WriteString("\r\n")
598	content.WriteString("\r\n--boundary123\r\n")
599	content.WriteString("Content-Type: text/plain; charset=\"utf-8\"\r\n")
600	content.WriteString("\r\n" + msg.Text + "\r\n")
601	content.WriteString("--boundary123\r\n")
602	content.WriteString("Content-Type: text/html; charset=\"utf-8\"\r\n")
603	content.WriteString("\r\n" + msg.Html + "\r\n")
604	content.WriteString("--boundary123--")
605
606	reader := strings.NewReader(content.String())
607	logger.Info("sending email digest")
608	err := smtp.SendMail(
609		smtpAddr,
610		f.auth,
611		fromEmail,
612		to,
613		reader,
614	)
615	return err
616}
617
618func (f *Fetcher) Run(now time.Time) error {
619	users, err := f.db.FindUsersWithPost("feeds")
620	if err != nil {
621		return err
622	}
623
624	for _, user := range users {
625		err := f.RunUser(user, now)
626		if err != nil {
627			f.cfg.Logger.Error("run user failed", "err", err)
628			continue
629		}
630	}
631
632	return nil
633}
634
635func (f *Fetcher) Loop() {
636	logger := f.cfg.Logger
637	for {
638		logger.Info("running digest emailer")
639
640		err := f.Run(time.Now().UTC())
641		if err != nil {
642			logger.Error("run failed", "err", err)
643		}
644
645		logger.Info("digest emailer finished, waiting 1min ...")
646		time.Sleep(1 * time.Minute)
647	}
648}