repos / pico

pico services mono repo
git clone https://github.com/picosh/pico.git

pico / pkg / apps / feeds
Eric Bower  ·  2026-01-25

cron.go

  1package feeds
  2
  3import (
  4	"crypto/tls"
  5	"errors"
  6	"fmt"
  7	html "html/template"
  8	"io"
  9	"log/slog"
 10	"math"
 11	"net/http"
 12	"net/url"
 13	"os"
 14	"strings"
 15	"text/template"
 16	"time"
 17
 18	"github.com/adhocore/gronx"
 19	"github.com/emersion/go-sasl"
 20	"github.com/emersion/go-smtp"
 21	"github.com/mmcdole/gofeed"
 22	"github.com/picosh/pico/pkg/db"
 23	"github.com/picosh/pico/pkg/shared"
 24)
 25
 26var ErrNoRecentArticles = errors.New("no recent articles")
 27
 28type UserAgentTransport struct {
 29	http.RoundTripper
 30}
 31
 32func (c *UserAgentTransport) RoundTrip(r *http.Request) (*http.Response, error) {
 33	userAgent := "linux:feeds:v2 (by /u/pico-sh)"
 34	r.Header.Set("User-Agent", userAgent)
 35	r.Header.Set("Accept", "*/*")
 36	return c.RoundTripper.RoundTrip(r)
 37}
 38
 39var httpClient = http.Client{
 40	Transport: &UserAgentTransport{
 41		&http.Transport{
 42			TLSClientConfig: &tls.Config{},
 43		},
 44	},
 45}
 46
 47type FeedItemTmpl struct {
 48	GUID        string
 49	Title       string
 50	Link        string
 51	PublishedAt *time.Time
 52	Content     html.HTML
 53	Description html.HTML
 54}
 55
 56type Feed struct {
 57	Title       string
 58	Link        string
 59	Description string
 60	Items       []*FeedItemTmpl
 61	FeedItems   []*gofeed.Item
 62}
 63
 64type DigestFeed struct {
 65	Feeds        []*Feed
 66	Options      DigestOptions
 67	KeepAliveURL string
 68	UnsubURL     string
 69	DaysLeft     string
 70	ShowBanner   bool
 71	SizeWarning  bool
 72	IsPicoPlus   bool
 73}
 74
 75type DigestOptions struct {
 76	InlineContent bool
 77}
 78
 79func itemToTemplate(item *gofeed.Item) *FeedItemTmpl {
 80	return &FeedItemTmpl{
 81		Title:       item.Title,
 82		Link:        item.Link,
 83		PublishedAt: item.PublishedParsed,
 84		Description: html.HTML(item.Description),
 85		Content:     html.HTML(item.Content),
 86	}
 87}
 88
 89func DigestIntervalToCron(interval string) string {
 90	switch interval {
 91	case "10min":
 92		return "*/10 * * * *"
 93	case "1hour":
 94		return "0 * * * *"
 95	case "6hour":
 96		return "0 */6 * * *"
 97	case "12hour":
 98		return "0 */12 * * *"
 99	case "1day", "":
100		return "0 13 * * *"
101	case "7day":
102		return "0 13 * * 0"
103	case "30day":
104		return "0 13 1 * *"
105	default:
106		return "0 13 * * *"
107	}
108}
109
110func getFeedItemID(logger *slog.Logger, item *gofeed.Item) string {
111	guid := strings.ToValidUTF8(item.GUID, "")
112	if item.GUID == "" {
113		logger.Info("no <guid> found for feed item, using <link> instead for its unique id")
114		return strings.ToValidUTF8(item.Link, "")
115	}
116	return guid
117}
118
119// see if this feed item should be emailed to user.
120func isValidItem(logger *slog.Logger, item *gofeed.Item, feedItems []*db.FeedItem) bool {
121	for _, feedItem := range feedItems {
122		if getFeedItemID(logger, item) == feedItem.GUID {
123			return false
124		}
125	}
126
127	return true
128}
129
130type Fetcher struct {
131	cfg  *shared.ConfigSite
132	db   db.DB
133	auth sasl.Client
134	gron *gronx.Gronx
135	host string
136}
137
138func NewFetcher(dbpool db.DB, cfg *shared.ConfigSite) *Fetcher {
139	host := os.Getenv("PICO_SMTP_HOST")
140	smtPass := os.Getenv("PICO_SMTP_PASS")
141	emailLogin := os.Getenv("PICO_SMTP_USER")
142
143	auth := sasl.NewPlainClient("", emailLogin, smtPass)
144	gron := gronx.New()
145	return &Fetcher{
146		db:   dbpool,
147		cfg:  cfg,
148		auth: auth,
149		gron: gron,
150		host: host,
151	}
152}
153
154func DateToMin(now time.Time) time.Time {
155	return time.Date(
156		now.Year(), now.Month(), now.Day(),
157		now.Hour(), now.Minute(),
158		0, 0, // zero out second and nano-second for cron
159		now.Location(),
160	)
161}
162
163func (f *Fetcher) Validate(logger *slog.Logger, post *db.Post, parsed *shared.ListParsedText, now time.Time) error {
164	expiresAt := post.ExpiresAt
165	if expiresAt != nil {
166		if post.ExpiresAt.Before(now) {
167			return fmt.Errorf("(%s) post has expired, skipping", post.ExpiresAt.Format(time.RFC3339))
168		}
169	}
170
171	cron := parsed.Cron
172	// support for posts with deprecated `digest_interval` property
173	if parsed.DigestInterval != "" {
174		cron = DigestIntervalToCron(parsed.DigestInterval)
175	}
176	logger.Info("found cron", "cron", cron)
177
178	if !f.gron.IsValid(cron) {
179		return fmt.Errorf("(%s) is invalid `cron`, skipping", cron)
180	}
181
182	dt := DateToMin(now)
183	isDue, err := f.gron.IsDue(cron, dt)
184	if err != nil {
185		return fmt.Errorf("cron error, skipping; err: %w", err)
186	}
187	if !isDue {
188		nextTime, _ := gronx.NextTickAfter(cron, dt, true)
189		return fmt.Errorf(
190			"cron not time to digest, skipping; cur run: %s, next run: %s",
191			f.gron.C.GetRef(),
192			nextTime,
193		)
194	}
195	return nil
196}
197
198func (f *Fetcher) RunPost(logger *slog.Logger, user *db.User, isPicoPlus bool, post *db.Post, skipValidation bool, now time.Time) error {
199	logger = logger.With("filename", post.Filename)
200	logger.Info("running feed post")
201
202	parsed := shared.ListParseText(post.Text)
203
204	if parsed.Email == "" {
205		logger.Error("post does not have an email associated, removing post")
206		err := f.db.RemovePosts([]string{post.ID})
207		if err != nil {
208			return err
209		}
210	}
211
212	if post.Data.LastDigest != nil {
213		logger.Info("last digest", "at", post.Data.LastDigest.Format(time.RFC3339))
214	} else {
215		logger.Info("first time running post")
216	}
217	err := f.Validate(logger, post, parsed, now)
218	if err != nil {
219		logger.Info("validation failed", "err", err)
220		if skipValidation {
221			logger.Info("overriding validation error, continuing")
222		} else {
223			return nil
224		}
225	}
226	logger.Info("validation success")
227
228	urls := []string{}
229	for _, item := range parsed.Items {
230		u := ""
231		if item.IsText || item.IsURL {
232			u = item.Value
233		} else if item.IsURL {
234			u = string(item.Value)
235		}
236
237		if u == "" {
238			continue
239		}
240
241		_, err := url.Parse(string(item.URL))
242		if err != nil {
243			logger.Info("invalid url", "url", string(item.URL))
244			continue
245		}
246
247		logger.Info("found rss feed url", "url", u)
248		urls = append(urls, u)
249	}
250
251	if post.ExpiresAt == nil {
252		expiresAt := now.AddDate(0, 12, 0)
253		post.ExpiresAt = &expiresAt
254	}
255	_, err = f.db.UpdatePost(post)
256	if err != nil {
257		return err
258	}
259
260	subject := fmt.Sprintf("%s feed digest", post.Filename)
261	unsubURL := getUnsubURL(post)
262
263	msgBody, err := f.FetchAll(logger, urls, parsed.InlineContent, user.Name, isPicoPlus, post)
264	if err != nil {
265		errForUser := err
266
267		// we don't want to increment in this case
268		if errors.Is(errForUser, ErrNoRecentArticles) {
269			return nil
270		}
271
272		post.Data.Attempts += 1
273		logger.Error("could not fetch urls", "err", err, "attempts", post.Data.Attempts)
274
275		maxAttempts := 10
276		errBody := fmt.Sprintf(`There was an error attempting to fetch your feeds (%d) times.  After (%d) attempts we remove the file from our system.  Please check all the URLs and re-upload.
277Also, we have centralized logs in our pico.sh TUI that will display realtime feed errors so you can debug.
278
279
280%s
281
282
283%s`, post.Data.Attempts, maxAttempts, errForUser.Error(), post.Text)
284		err = f.SendEmail(
285			logger,
286			user.Name,
287			parsed.Email,
288			subject,
289			unsubURL,
290			&MsgBody{Html: strings.ReplaceAll(errBody, "\n", "<br />"), Text: errBody},
291		)
292		if err != nil {
293			return err
294		}
295
296		if post.Data.Attempts >= maxAttempts {
297			err = f.db.RemovePosts([]string{post.ID})
298			if err != nil {
299				return err
300			}
301		} else {
302			_, err = f.db.UpdatePost(post)
303			if err != nil {
304				return err
305			}
306		}
307		return errForUser
308	} else {
309		post.Data.Attempts = 0
310		_, err := f.db.UpdatePost(post)
311		if err != nil {
312			return err
313		}
314	}
315
316	if msgBody != nil {
317		err = f.SendEmail(logger, user.Name, parsed.Email, subject, unsubURL, msgBody)
318		if err != nil {
319			return err
320		}
321	}
322
323	post.Data.LastDigest = &now
324	_, err = f.db.UpdatePost(post)
325	if err != nil {
326		return err
327	}
328
329	return nil
330}
331
332func (f *Fetcher) RunUser(user *db.User, now time.Time) error {
333	logger := shared.LoggerWithUser(f.cfg.Logger, user)
334	logger.Info("run user")
335	posts, err := f.db.FindPostsByUser(&db.Pager{Num: 100}, user.ID, "feeds")
336	if err != nil {
337		return err
338	}
339	isPicoPlus := false
340	ff, _ := f.db.FindFeature(user.ID, "plus")
341	if ff != nil {
342		if ff.IsValid() {
343			isPicoPlus = true
344		}
345	}
346
347	if len(posts.Data) > 0 {
348		logger.Info("found feed posts", "len", len(posts.Data))
349	}
350
351	for _, post := range posts.Data {
352		err = f.RunPost(logger, user, isPicoPlus, post, false, now)
353		if err != nil {
354			logger.Error("run post failed", "err", err)
355		}
356	}
357
358	return nil
359}
360
361func (f *Fetcher) ParseURL(fp *gofeed.Parser, url string) (*gofeed.Feed, error) {
362	req, err := http.NewRequest("GET", url, nil)
363	if err != nil {
364		return nil, err
365	}
366
367	resp, err := httpClient.Do(req)
368	if err != nil {
369		return nil, err
370	}
371
372	defer func() {
373		_ = resp.Body.Close()
374	}()
375	body, err := io.ReadAll(resp.Body)
376	if err != nil {
377		return nil, err
378	}
379
380	if resp.StatusCode < 200 || resp.StatusCode > 300 {
381		return nil, fmt.Errorf("fetching feed resulted in an error: %s %s", resp.Status, body)
382	}
383
384	feed, err := fp.ParseString(string(body))
385	if err != nil {
386		return nil, err
387	}
388
389	return feed, nil
390}
391
392func (f *Fetcher) Fetch(logger *slog.Logger, fp *gofeed.Parser, url string, username string, feedItems []*db.FeedItem) (*Feed, error) {
393	logger.Info("fetching feed", "url", url)
394
395	feed, err := f.ParseURL(fp, url)
396	if err != nil {
397		return nil, err
398	}
399
400	feedTmpl := &Feed{
401		Title:       feed.Title,
402		Description: feed.Description,
403		Link:        feed.Link,
404	}
405
406	items := []*FeedItemTmpl{}
407	gofeedItems := []*gofeed.Item{}
408	// we only want to return feed items published since the last digest time we fetched
409	for _, item := range feed.Items {
410		if item == nil {
411			continue
412		}
413
414		if !isValidItem(logger, item, feedItems) {
415			logger.Info("feed item already served", "guid", item.GUID)
416			continue
417		}
418
419		gofeedItems = append(gofeedItems, item)
420		items = append(items, itemToTemplate(item))
421	}
422
423	if len(items) == 0 {
424		return nil, fmt.Errorf(
425			"%s %w, skipping",
426			url,
427			ErrNoRecentArticles,
428		)
429	}
430
431	feedTmpl.FeedItems = gofeedItems
432	feedTmpl.Items = items
433	return feedTmpl, nil
434}
435
436func (f *Fetcher) PrintText(feedTmpl *DigestFeed) (string, error) {
437	ts, err := template.ParseFiles(
438		f.cfg.StaticPath("html/digest_text.page.tmpl"),
439	)
440
441	if err != nil {
442		return "", err
443	}
444
445	w := new(strings.Builder)
446	err = ts.Execute(w, feedTmpl)
447	if err != nil {
448		return "", err
449	}
450
451	return w.String(), nil
452}
453
454func (f *Fetcher) PrintHtml(feedTmpl *DigestFeed) (string, error) {
455	ts, err := html.ParseFiles(
456		f.cfg.StaticPath("html/digest.page.tmpl"),
457	)
458
459	if err != nil {
460		return "", err
461	}
462
463	w := new(strings.Builder)
464	err = ts.Execute(w, feedTmpl)
465	if err != nil {
466		return "", err
467	}
468
469	return w.String(), nil
470}
471
472type MsgBody struct {
473	Html string
474	Text string
475}
476
477func getUnsubURL(post *db.Post) string {
478	return fmt.Sprintf("https://feeds.pico.sh/unsub/%s", post.ID)
479}
480
481func (f *Fetcher) FetchAll(logger *slog.Logger, urls []string, inlineContent bool, username string, isPicoPlus bool, post *db.Post) (*MsgBody, error) {
482	logger.Info("fetching feeds", "inlineContent", inlineContent)
483	fp := gofeed.NewParser()
484	daysLeft := ""
485	showBanner := false
486	if post.ExpiresAt != nil {
487		diff := time.Until(*post.ExpiresAt)
488		daysLeftInt := int(math.Ceil(diff.Hours() / 24))
489		daysLeft = fmt.Sprintf("%d", daysLeftInt)
490		if daysLeftInt <= 30 {
491			showBanner = true
492		}
493	}
494	feeds := &DigestFeed{
495		KeepAliveURL: fmt.Sprintf("https://feeds.pico.sh/keep-alive/%s", post.ID),
496		UnsubURL:     getUnsubURL(post),
497		DaysLeft:     daysLeft,
498		ShowBanner:   showBanner,
499		Options:      DigestOptions{InlineContent: inlineContent},
500		IsPicoPlus:   isPicoPlus,
501	}
502	feedItems, err := f.db.FindFeedItemsByPostID(post.ID)
503	if err != nil {
504		return nil, err
505	}
506
507	if len(urls) == 0 {
508		return nil, fmt.Errorf("feed file does not contain any urls")
509	}
510
511	var allErrors error
512	for _, url := range urls {
513		feedTmpl, err := f.Fetch(logger, fp, url, username, feedItems)
514		if err != nil {
515			if errors.Is(err, ErrNoRecentArticles) {
516				logger.Info("no recent articles", "err", err)
517			} else {
518				allErrors = errors.Join(allErrors, fmt.Errorf("%s: %w", url, err))
519				logger.Error("fetch error", "err", err)
520			}
521			continue
522		}
523		feeds.Feeds = append(feeds.Feeds, feedTmpl)
524	}
525
526	if len(feeds.Feeds) == 0 {
527		if allErrors != nil {
528			return nil, allErrors
529		}
530		return nil, fmt.Errorf("%w, skipping email", ErrNoRecentArticles)
531	}
532
533	fdi := []*db.FeedItem{}
534	for _, feed := range feeds.Feeds {
535		for _, item := range feed.FeedItems {
536			uid := getFeedItemID(logger, item)
537			fdi = append(fdi, &db.FeedItem{
538				PostID: post.ID,
539				GUID:   uid,
540				Data: db.FeedItemData{
541					Title:       strings.ToValidUTF8(item.Title, ""),
542					Description: strings.ToValidUTF8(item.Description, ""),
543					Content:     strings.ToValidUTF8(item.Content, ""),
544					Link:        item.Link,
545					PublishedAt: item.PublishedParsed,
546				},
547			})
548		}
549	}
550	err = f.db.InsertFeedItems(post.ID, fdi)
551	if err != nil {
552		return nil, err
553	}
554
555	text, err := f.PrintText(feeds)
556	if err != nil {
557		return nil, err
558	}
559
560	html, err := f.PrintHtml(feeds)
561	if err != nil {
562		return nil, err
563	}
564
565	// cap body size to prevent abuse
566	if len(html)+len(text) > 5*shared.MB {
567		feeds.Options.InlineContent = false
568		feeds.SizeWarning = true
569		html, err = f.PrintHtml(feeds)
570		if err != nil {
571			return nil, err
572		}
573	}
574
575	if allErrors != nil {
576		text = fmt.Sprintf("> %s\n\n%s", allErrors, text)
577		html = fmt.Sprintf("<blockquote>%s</blockquote><br /><br/>%s", allErrors, html)
578	}
579
580	return &MsgBody{
581		Text: text,
582		Html: html,
583	}, nil
584}
585
586func (f *Fetcher) SendEmail(logger *slog.Logger, username, email, subject, unsubURL string, msg *MsgBody) error {
587	if email == "" {
588		return fmt.Errorf("(%s) does not have an email associated with their feed post", username)
589	}
590	smtpAddr := f.host
591	fromEmail := "hello@pico.sh"
592	to := []string{email}
593	headers := map[string]string{
594		"From":             fromEmail,
595		"Subject":          subject,
596		"To":               email,
597		"MIME-Version":     "1.0",
598		"Content-Type":     `multipart/alternative; boundary="boundary123"`,
599		"List-Unsubscribe": "<" + unsubURL + ">",
600	}
601	var content strings.Builder
602	for k, v := range headers {
603		content.WriteString(fmt.Sprintf("%s: %s\r\n", k, v))
604	}
605	content.WriteString("\r\n")
606	content.WriteString("\r\n--boundary123\r\n")
607	content.WriteString("Content-Type: text/plain; charset=\"utf-8\"\r\n")
608	content.WriteString("\r\n" + msg.Text + "\r\n")
609	content.WriteString("--boundary123\r\n")
610	content.WriteString("Content-Type: text/html; charset=\"utf-8\"\r\n")
611	content.WriteString("\r\n" + msg.Html + "\r\n")
612	content.WriteString("--boundary123--")
613
614	reader := strings.NewReader(content.String())
615	logger.Info("sending email digest")
616	err := smtp.SendMail(
617		smtpAddr,
618		f.auth,
619		fromEmail,
620		to,
621		reader,
622	)
623	return err
624}
625
626func (f *Fetcher) Run(now time.Time) error {
627	users, err := f.db.FindUsersWithPost("feeds")
628	if err != nil {
629		return err
630	}
631
632	for _, user := range users {
633		err := f.RunUser(user, now)
634		if err != nil {
635			f.cfg.Logger.Error("run user failed", "err", err)
636			continue
637		}
638	}
639
640	return nil
641}
642
643func (f *Fetcher) Loop() {
644	logger := f.cfg.Logger
645	for {
646		logger.Info("running digest emailer")
647
648		err := f.Run(time.Now().UTC())
649		if err != nil {
650			logger.Error("run failed", "err", err)
651		}
652
653		logger.Info("digest emailer finished, waiting 1min ...")
654		time.Sleep(1 * time.Minute)
655	}
656}