repos / pico

pico services mono repo
git clone https://github.com/picosh/pico.git

pico / pkg / apps / feeds
Eric Bower  ·  2026-03-08

cron.go

  1package feeds
  2
  3import (
  4	"crypto/tls"
  5	"errors"
  6	"fmt"
  7	html "html/template"
  8	"io"
  9	"log/slog"
 10	"math"
 11	"net/http"
 12	"net/url"
 13	"os"
 14	"strings"
 15	"text/template"
 16	"time"
 17
 18	"github.com/adhocore/gronx"
 19	"github.com/emersion/go-sasl"
 20	"github.com/emersion/go-smtp"
 21	"github.com/mmcdole/gofeed"
 22	"github.com/picosh/pico/pkg/db"
 23	"github.com/picosh/pico/pkg/shared"
 24)
 25
 26var ErrNoRecentArticles = errors.New("no recent articles")
 27
 28type UserAgentTransport struct {
 29	http.RoundTripper
 30}
 31
 32func (c *UserAgentTransport) RoundTrip(r *http.Request) (*http.Response, error) {
 33	userAgent := "linux:feeds:v2 (by /u/pico-sh)"
 34	r.Header.Set("User-Agent", userAgent)
 35	r.Header.Set("Accept", "*/*")
 36	return c.RoundTripper.RoundTrip(r)
 37}
 38
 39var httpClient = http.Client{
 40	Transport: &UserAgentTransport{
 41		&http.Transport{
 42			TLSClientConfig: &tls.Config{},
 43		},
 44	},
 45}
 46
 47type FeedItemTmpl struct {
 48	GUID        string
 49	Title       string
 50	Link        string
 51	PublishedAt *time.Time
 52	Content     html.HTML
 53	Description html.HTML
 54}
 55
 56type Feed struct {
 57	Title       string
 58	Link        string
 59	Description string
 60	Items       []*FeedItemTmpl
 61	FeedItems   []*gofeed.Item
 62}
 63
 64type DigestFeed struct {
 65	Feeds        []*Feed
 66	Options      DigestOptions
 67	KeepAliveURL string
 68	UnsubURL     string
 69	DaysLeft     string
 70	ShowBanner   bool
 71	SizeWarning  bool
 72	IsPicoPlus   bool
 73}
 74
 75type DigestOptions struct {
 76	InlineContent bool
 77}
 78
 79func itemToTemplate(item *gofeed.Item) *FeedItemTmpl {
 80	return &FeedItemTmpl{
 81		Title:       item.Title,
 82		Link:        item.Link,
 83		PublishedAt: item.PublishedParsed,
 84		Description: html.HTML(item.Description),
 85		Content:     html.HTML(item.Content),
 86	}
 87}
 88
 89func DigestIntervalToCron(interval string) string {
 90	switch interval {
 91	case "10min":
 92		return "*/10 * * * *"
 93	case "1hour":
 94		return "0 * * * *"
 95	case "6hour":
 96		return "0 */6 * * *"
 97	case "12hour":
 98		return "0 */12 * * *"
 99	case "1day", "":
100		return "0 13 * * *"
101	case "7day":
102		return "0 13 * * 0"
103	case "30day":
104		return "0 13 1 * *"
105	default:
106		return "0 13 * * *"
107	}
108}
109
110func getFeedItemID(logger *slog.Logger, item *gofeed.Item) string {
111	guid := strings.ToValidUTF8(item.GUID, "")
112	if item.GUID == "" {
113		logger.Info("no <guid> found for feed item, using <link> instead for its unique id")
114		return strings.ToValidUTF8(item.Link, "")
115	}
116	return guid
117}
118
119// see if this feed item should be emailed to user.
120func isValidItem(logger *slog.Logger, item *gofeed.Item, feedItems []*db.FeedItem) bool {
121	for _, feedItem := range feedItems {
122		if getFeedItemID(logger, item) == feedItem.GUID {
123			return false
124		}
125	}
126
127	return true
128}
129
130type Fetcher struct {
131	cfg  *shared.ConfigSite
132	db   db.DB
133	auth sasl.Client
134	gron *gronx.Gronx
135	host string
136}
137
138func NewFetcher(dbpool db.DB, cfg *shared.ConfigSite) *Fetcher {
139	host := os.Getenv("PICO_SMTP_HOST")
140	smtPass := os.Getenv("PICO_SMTP_PASS")
141	emailLogin := os.Getenv("PICO_SMTP_USER")
142
143	auth := sasl.NewPlainClient("", emailLogin, smtPass)
144	gron := gronx.New()
145	return &Fetcher{
146		db:   dbpool,
147		cfg:  cfg,
148		auth: auth,
149		gron: gron,
150		host: host,
151	}
152}
153
154func DateToMin(now time.Time) time.Time {
155	return time.Date(
156		now.Year(), now.Month(), now.Day(),
157		now.Hour(), now.Minute(),
158		0, 0, // zero out second and nano-second for cron
159		now.Location(),
160	)
161}
162
163func (f *Fetcher) Validate(logger *slog.Logger, post *db.Post, parsed *shared.ListParsedText, now time.Time) error {
164	expiresAt := post.ExpiresAt
165	if expiresAt != nil {
166		if post.ExpiresAt.Before(now) {
167			return fmt.Errorf("(%s) post has expired, skipping", post.ExpiresAt.Format(time.RFC3339))
168		}
169	}
170
171	cron := parsed.Cron
172	// support for posts with deprecated `digest_interval` property
173	if parsed.DigestInterval != "" {
174		cron = DigestIntervalToCron(parsed.DigestInterval)
175	}
176	logger.Info("found cron", "cron", cron)
177
178	if !f.gron.IsValid(cron) {
179		return fmt.Errorf("(%s) is invalid `cron`, skipping", cron)
180	}
181
182	dt := DateToMin(now)
183	isDue, err := f.gron.IsDue(cron, dt)
184	if err != nil {
185		return fmt.Errorf("cron error, skipping; err: %w", err)
186	}
187	if !isDue {
188		nextTime, _ := gronx.NextTickAfter(cron, dt, true)
189		return fmt.Errorf(
190			"cron not time to digest, skipping; cur run: %s, next run: %s",
191			f.gron.C.GetRef(),
192			nextTime,
193		)
194	}
195	return nil
196}
197
198func (f *Fetcher) RunPost(logger *slog.Logger, user *db.User, isPicoPlus bool, post *db.Post, skipValidation bool, now time.Time) error {
199	logger = logger.With("filename", post.Filename)
200	logger.Info("running feed post")
201
202	parsed := shared.ListParseText(post.Text)
203
204	if parsed.Email == "" {
205		logger.Error("post does not have an email associated, removing post")
206		err := f.db.RemovePosts([]string{post.ID})
207		if err != nil {
208			return err
209		}
210	}
211
212	if post.Data.LastDigest != nil {
213		logger.Info("last digest", "at", post.Data.LastDigest.Format(time.RFC3339))
214	} else {
215		logger.Info("first time running post")
216	}
217	err := f.Validate(logger, post, parsed, now)
218	if err != nil {
219		logger.Info("validation failed", "err", err)
220		if skipValidation {
221			logger.Info("overriding validation error, continuing")
222		} else {
223			return nil
224		}
225	}
226	logger.Info("validation success")
227
228	urls := []string{}
229	for _, item := range parsed.Items {
230		rawUrl := ""
231		if item.IsText {
232			rawUrl = string(item.Value)
233		} else if item.IsURL {
234			rawUrl = string(item.URL)
235		}
236
237		if rawUrl == "" {
238			continue
239		}
240
241		_, err := url.Parse(rawUrl)
242		if err != nil {
243			logger.Info("invalid url", "url", rawUrl)
244			continue
245		}
246
247		logger.Info("found rss feed url", "url", rawUrl)
248		urls = append(urls, rawUrl)
249	}
250
251	if post.ExpiresAt == nil {
252		expiresAt := now.AddDate(0, 12, 0)
253		post.ExpiresAt = &expiresAt
254	}
255	_, err = f.db.UpdatePost(post)
256	if err != nil {
257		return err
258	}
259
260	subject := fmt.Sprintf("%s feed digest", post.Filename)
261	unsubURL := getUnsubURL(post)
262
263	msgBody, err := f.FetchAll(logger, urls, parsed.InlineContent, user.Name, isPicoPlus, post)
264	if err != nil {
265		errForUser := err
266
267		// we don't want to increment in this case
268		if errors.Is(errForUser, ErrNoRecentArticles) {
269			return nil
270		}
271
272		post.Data.Attempts += 1
273		logger.Error("could not fetch urls", "err", err, "attempts", post.Data.Attempts)
274
275		maxAttempts := 100
276		errBody := fmt.Sprintf(`There was an error attempting to fetch your feeds (%d) times.  After (%d) attempts we remove the file from our system.  Please check all the URLs and re-upload.
277Also, we have centralized logs in our pico.sh TUI that will display realtime feed errors so you can debug.
278
279
280%s
281
282
283%s`, post.Data.Attempts, maxAttempts, errForUser.Error(), post.Text)
284		err = f.SendEmail(
285			logger,
286			user.Name,
287			parsed.Email,
288			subject,
289			unsubURL,
290			&MsgBody{Html: strings.ReplaceAll(errBody, "\n", "<br />"), Text: errBody},
291		)
292		if err != nil {
293			return err
294		}
295
296		if post.Data.Attempts >= maxAttempts {
297			err = f.db.RemovePosts([]string{post.ID})
298			if err != nil {
299				return err
300			}
301		} else {
302			_, err = f.db.UpdatePost(post)
303			if err != nil {
304				return err
305			}
306		}
307		return errForUser
308	} else {
309		post.Data.Attempts = 0
310		_, err := f.db.UpdatePost(post)
311		if err != nil {
312			return err
313		}
314	}
315
316	if msgBody != nil {
317		err = f.SendEmail(logger, user.Name, parsed.Email, subject, unsubURL, msgBody)
318		if err != nil {
319			return err
320		}
321	}
322
323	post.Data.LastDigest = &now
324	_, err = f.db.UpdatePost(post)
325	if err != nil {
326		return err
327	}
328
329	return nil
330}
331
332func (f *Fetcher) RunUser(user *db.User, now time.Time) error {
333	logger := shared.LoggerWithUser(f.cfg.Logger, user)
334	logger.Info("run user")
335	posts, err := f.db.FindPostsByUser(&db.Pager{Num: 100}, user.ID, "feeds")
336	if err != nil {
337		return err
338	}
339	isPicoPlus := false
340	ff, _ := f.db.FindFeature(user.ID, "plus")
341	if ff != nil {
342		if ff.IsValid() {
343			isPicoPlus = true
344		}
345	}
346
347	if !isPicoPlus {
348		return fmt.Errorf("must be pico+ user to receive rss digests")
349	}
350
351	if len(posts.Data) > 0 {
352		logger.Info("found feed posts", "len", len(posts.Data))
353	}
354
355	for _, post := range posts.Data {
356		err = f.RunPost(logger, user, isPicoPlus, post, false, now)
357		if err != nil {
358			logger.Error("run post failed", "err", err)
359		}
360	}
361
362	return nil
363}
364
365func (f *Fetcher) ParseURL(fp *gofeed.Parser, url string) (*gofeed.Feed, error) {
366	req, err := http.NewRequest("GET", url, nil)
367	if err != nil {
368		return nil, err
369	}
370
371	resp, err := httpClient.Do(req)
372	if err != nil {
373		return nil, err
374	}
375
376	defer func() {
377		_ = resp.Body.Close()
378	}()
379	body, err := io.ReadAll(resp.Body)
380	if err != nil {
381		return nil, err
382	}
383
384	if resp.StatusCode < 200 || resp.StatusCode > 300 {
385		return nil, fmt.Errorf("fetching feed resulted in an error: %s %s", resp.Status, body)
386	}
387
388	feed, err := fp.ParseString(string(body))
389	if err != nil {
390		return nil, err
391	}
392
393	return feed, nil
394}
395
396func (f *Fetcher) Fetch(logger *slog.Logger, fp *gofeed.Parser, url string, username string, feedItems []*db.FeedItem) (*Feed, error) {
397	logger.Info("fetching feed", "url", url)
398
399	feed, err := f.ParseURL(fp, url)
400	if err != nil {
401		return nil, err
402	}
403
404	feedTmpl := &Feed{
405		Title:       feed.Title,
406		Description: feed.Description,
407		Link:        feed.Link,
408	}
409
410	items := []*FeedItemTmpl{}
411	gofeedItems := []*gofeed.Item{}
412	// we only want to return feed items published since the last digest time we fetched
413	for _, item := range feed.Items {
414		if item == nil {
415			continue
416		}
417
418		if !isValidItem(logger, item, feedItems) {
419			logger.Info("feed item already served", "guid", item.GUID)
420			continue
421		}
422
423		gofeedItems = append(gofeedItems, item)
424		items = append(items, itemToTemplate(item))
425	}
426
427	if len(items) == 0 {
428		return nil, fmt.Errorf(
429			"%s %w, skipping",
430			url,
431			ErrNoRecentArticles,
432		)
433	}
434
435	feedTmpl.FeedItems = gofeedItems
436	feedTmpl.Items = items
437	return feedTmpl, nil
438}
439
440func (f *Fetcher) PrintText(feedTmpl *DigestFeed) (string, error) {
441	ts, err := template.ParseFiles(
442		f.cfg.StaticPath("html/digest_text.page.tmpl"),
443	)
444
445	if err != nil {
446		return "", err
447	}
448
449	w := new(strings.Builder)
450	err = ts.Execute(w, feedTmpl)
451	if err != nil {
452		return "", err
453	}
454
455	return w.String(), nil
456}
457
458func (f *Fetcher) PrintHtml(feedTmpl *DigestFeed) (string, error) {
459	ts, err := html.ParseFiles(
460		f.cfg.StaticPath("html/digest.page.tmpl"),
461	)
462
463	if err != nil {
464		return "", err
465	}
466
467	w := new(strings.Builder)
468	err = ts.Execute(w, feedTmpl)
469	if err != nil {
470		return "", err
471	}
472
473	return w.String(), nil
474}
475
476type MsgBody struct {
477	Html string
478	Text string
479}
480
481func getUnsubURL(post *db.Post) string {
482	return fmt.Sprintf("https://feeds.pico.sh/unsub/%s", post.ID)
483}
484
485func (f *Fetcher) FetchAll(logger *slog.Logger, urls []string, inlineContent bool, username string, isPicoPlus bool, post *db.Post) (*MsgBody, error) {
486	logger.Info("fetching feeds", "inlineContent", inlineContent)
487	fp := gofeed.NewParser()
488	daysLeft := ""
489	showBanner := false
490	if post.ExpiresAt != nil {
491		diff := time.Until(*post.ExpiresAt)
492		daysLeftInt := int(math.Ceil(diff.Hours() / 24))
493		daysLeft = fmt.Sprintf("%d", daysLeftInt)
494		if daysLeftInt <= 30 {
495			showBanner = true
496		}
497	}
498	feeds := &DigestFeed{
499		KeepAliveURL: fmt.Sprintf("https://feeds.pico.sh/keep-alive/%s", post.ID),
500		UnsubURL:     getUnsubURL(post),
501		DaysLeft:     daysLeft,
502		ShowBanner:   showBanner,
503		Options:      DigestOptions{InlineContent: inlineContent},
504		IsPicoPlus:   isPicoPlus,
505	}
506	feedItems, err := f.db.FindFeedItemsByPostID(post.ID)
507	if err != nil {
508		return nil, err
509	}
510
511	if len(urls) == 0 {
512		return nil, fmt.Errorf("feed file does not contain any urls")
513	}
514
515	var allErrors error
516	for _, url := range urls {
517		feedTmpl, err := f.Fetch(logger, fp, url, username, feedItems)
518		if err != nil {
519			if errors.Is(err, ErrNoRecentArticles) {
520				logger.Info("no recent articles", "err", err)
521			} else {
522				allErrors = errors.Join(allErrors, fmt.Errorf("%s: %w", url, err))
523				logger.Error("fetch error", "err", err)
524			}
525			continue
526		}
527		feeds.Feeds = append(feeds.Feeds, feedTmpl)
528	}
529
530	if len(feeds.Feeds) == 0 {
531		if allErrors != nil {
532			return nil, allErrors
533		}
534		return nil, fmt.Errorf("%w, skipping email", ErrNoRecentArticles)
535	}
536
537	fdi := []*db.FeedItem{}
538	for _, feed := range feeds.Feeds {
539		for _, item := range feed.FeedItems {
540			uid := getFeedItemID(logger, item)
541			fdi = append(fdi, &db.FeedItem{
542				PostID: post.ID,
543				GUID:   uid,
544				Data: db.FeedItemData{
545					Title:       strings.ToValidUTF8(item.Title, ""),
546					Description: strings.ToValidUTF8(item.Description, ""),
547					Content:     strings.ToValidUTF8(item.Content, ""),
548					Link:        item.Link,
549					PublishedAt: item.PublishedParsed,
550				},
551			})
552		}
553	}
554	err = f.db.InsertFeedItems(post.ID, fdi)
555	if err != nil {
556		return nil, err
557	}
558
559	text, err := f.PrintText(feeds)
560	if err != nil {
561		return nil, err
562	}
563
564	html, err := f.PrintHtml(feeds)
565	if err != nil {
566		return nil, err
567	}
568
569	// cap body size to prevent abuse
570	if len(html)+len(text) > 5*shared.MB {
571		feeds.Options.InlineContent = false
572		feeds.SizeWarning = true
573		html, err = f.PrintHtml(feeds)
574		if err != nil {
575			return nil, err
576		}
577	}
578
579	if allErrors != nil {
580		text = fmt.Sprintf("> %s\n\n%s", allErrors, text)
581		html = fmt.Sprintf("<blockquote>%s</blockquote><br /><br/>%s", allErrors, html)
582	}
583
584	return &MsgBody{
585		Text: text,
586		Html: html,
587	}, nil
588}
589
590func (f *Fetcher) SendEmail(logger *slog.Logger, username, email, subject, unsubURL string, msg *MsgBody) error {
591	if email == "" {
592		return fmt.Errorf("(%s) does not have an email associated with their feed post", username)
593	}
594	smtpAddr := f.host
595	fromEmail := "hello@pico.sh"
596	to := []string{email}
597	headers := map[string]string{
598		"From":             fromEmail,
599		"Subject":          subject,
600		"To":               email,
601		"MIME-Version":     "1.0",
602		"Content-Type":     `multipart/alternative; boundary="boundary123"`,
603		"List-Unsubscribe": "<" + unsubURL + ">",
604	}
605	var content strings.Builder
606	for k, v := range headers {
607		fmt.Fprintf(&content, "%s: %s\r\n", k, v)
608	}
609	content.WriteString("\r\n")
610	content.WriteString("\r\n--boundary123\r\n")
611	content.WriteString("Content-Type: text/plain; charset=\"utf-8\"\r\n")
612	content.WriteString("\r\n" + msg.Text + "\r\n")
613	content.WriteString("--boundary123\r\n")
614	content.WriteString("Content-Type: text/html; charset=\"utf-8\"\r\n")
615	content.WriteString("\r\n" + msg.Html + "\r\n")
616	content.WriteString("--boundary123--")
617
618	reader := strings.NewReader(content.String())
619	logger.Info("sending email digest")
620	err := smtp.SendMail(
621		smtpAddr,
622		f.auth,
623		fromEmail,
624		to,
625		reader,
626	)
627	return err
628}
629
630func (f *Fetcher) Run(now time.Time) error {
631	users, err := f.db.FindUsersWithPost("feeds")
632	if err != nil {
633		return err
634	}
635
636	for _, user := range users {
637		err := f.RunUser(user, now)
638		if err != nil {
639			f.cfg.Logger.Error("run user failed", "err", err)
640			continue
641		}
642	}
643
644	return nil
645}
646
647func (f *Fetcher) Loop() {
648	logger := f.cfg.Logger
649	for {
650		logger.Info("running digest emailer")
651
652		err := f.Run(time.Now().UTC())
653		if err != nil {
654			logger.Error("run failed", "err", err)
655		}
656
657		logger.Info("digest emailer finished, waiting 1min ...")
658		time.Sleep(1 * time.Minute)
659	}
660}