repos / pico

pico services mono repo
git clone https://github.com/picosh/pico.git

pico / pkg / apps / auth
Eric Bower  ·  2026-05-03

analytics_aggregate.go

  1package auth
  2
  3import (
  4	"context"
  5	"fmt"
  6	"log/slog"
  7	"strings"
  8	"time"
  9
 10	"github.com/picosh/pico/pkg/db/postgres"
 11)
 12
 13// visitRow represents a single raw visit record for aggregation.
 14type visitRow struct {
 15	UserID    string
 16	Host      string
 17	Path      string
 18	IPAddress string
 19	Referer   string
 20	Status    int
 21	UserAgent string
 22	CreatedAt time.Time
 23}
 24
 25// dayStats accumulates per-day visitor counts for a (user_id, host) pair.
 26type dayStats struct {
 27	Date       time.Time
 28	AllIPs     map[string]bool
 29	MobileIPs  map[string]bool
 30	DesktopIPs map[string]bool
 31}
 32
 33// pathStats accumulates per-path visitor counts for a (user_id, host, status_code) triplet.
 34type pathStats struct {
 35	Path   string
 36	Status int
 37	IPs    map[string]bool
 38}
 39
 40// userHostPair represents a unique (user_id, host) combination.
 41type userHostPair struct {
 42	UserID string
 43	Host   string
 44}
 45
 46// RunAnalyticsAggregation aggregates raw visits for the given month into summary tables.
 47// Exported for use by the analytics-aggregate CLI script.
 48func RunAnalyticsAggregation(dbpool *postgres.PsqlDB, logger *slog.Logger, targetMonth time.Time) error {
 49	monthStart := time.Date(targetMonth.Year(), targetMonth.Month(), 1, 0, 0, 0, 0, time.UTC)
 50	monthEnd := monthStart.AddDate(0, 1, 0)
 51
 52	logger.Info("starting analytics aggregation", "month", targetMonth.Format("2006-01"), "from", monthStart, "to", monthEnd)
 53
 54	pairs, err := fetchUserHostPairs(dbpool, monthStart, monthEnd)
 55	if err != nil {
 56		return fmt.Errorf("fetch user/host pairs: %w", err)
 57	}
 58	if len(pairs) == 0 {
 59		logger.Info("no user/host pairs to aggregate", "month", targetMonth.Format("2006-01"))
 60		return nil
 61	}
 62	logger.Info("found user/host pairs to process", "count", len(pairs), "month", targetMonth.Format("2006-01"))
 63
 64	for _, pair := range pairs {
 65		visits, err := fetchVisitsForPair(dbpool, pair.UserID, pair.Host, monthStart, monthEnd)
 66		if err != nil {
 67			logger.Error("failed to fetch visits", "err", err, "user_id", pair.UserID, "host", pair.Host)
 68			continue
 69		}
 70
 71		// Aggregate daily stats
 72		dayMap := aggregateDays(visits)
 73		if err := insertMonthlyVisits(dbpool, pair.UserID, pair.Host, dayMap); err != nil {
 74			logger.Error("failed to insert monthly visits", "err", err)
 75			continue
 76		}
 77
 78		// Aggregate top URLs grouped by status code
 79		topURLsByStatus := aggregatePaths(visits)
 80		for status, paths := range topURLsByStatus {
 81			if err := insertTopURLs(dbpool, pair.UserID, pair.Host, targetMonth, status, paths); err != nil {
 82				logger.Error("failed to insert top URLs", "err", err, "status", status)
 83				continue
 84			}
 85		}
 86
 87		// Aggregate top referers
 88		topReferers := aggregateReferers(visits)
 89		if err := insertTopReferers(dbpool, pair.UserID, pair.Host, targetMonth, topReferers); err != nil {
 90			logger.Error("failed to insert top referers", "err", err)
 91			continue
 92		}
 93
 94		// Upsert user site
 95		totalUnique := countTotalUnique(dayMap)
 96		if err := upsertUserSite(dbpool, pair.UserID, pair.Host, totalUnique, targetMonth); err != nil {
 97			logger.Error("failed to upsert user site", "err", err)
 98			continue
 99		}
100	}
101
102	// Delete raw visit data for users with analytics enabled
103	if err := deleteAggregatedVisits(dbpool, logger, monthStart, monthEnd); err != nil {
104		logger.Error("failed to delete aggregated visits", "err", err, "month", targetMonth.Format("2006-01"))
105	}
106
107	logger.Info("analytics aggregation complete", "month", targetMonth.Format("2006-01"))
108	return nil
109}
110
111// analyticsAggregationCron runs once per day to check if the previous month needs aggregation.
112func analyticsAggregationCron(ctx context.Context, dbpool *postgres.PsqlDB, logger *slog.Logger) {
113	// Check at midnight UTC every day
114	ticker := time.NewTicker(24 * time.Hour)
115	defer ticker.Stop()
116
117	// Run immediately on startup to catch up if the service was down
118	if err := checkAndRunAggregation(dbpool, logger); err != nil {
119		logger.Error("startup analytics aggregation check failed", "err", err)
120	}
121
122	for {
123		select {
124		case <-ctx.Done():
125			logger.Info("analytics aggregation cron stopped")
126			return
127		case <-ticker.C:
128			if err := checkAndRunAggregation(dbpool, logger); err != nil {
129				logger.Error("analytics aggregation check failed", "err", err)
130			}
131		}
132	}
133}
134
135// checkAndRunAggregation checks if the previous month has been aggregated and runs if needed.
136func checkAndRunAggregation(dbpool *postgres.PsqlDB, logger *slog.Logger) error {
137	prevMonth := time.Now().AddDate(0, -1, 0)
138	monthStart := time.Date(prevMonth.Year(), prevMonth.Month(), 1, 0, 0, 0, 0, time.UTC)
139	monthEnd := monthStart.AddDate(0, 1, 0)
140
141	// Check if any data exists for this month in the summary table
142	var count int
143	err := dbpool.Db.QueryRowContext(context.Background(),
144		`SELECT COUNT(DISTINCT host) FROM analytics_monthly_visits WHERE visit_date >= $1 AND visit_date < $2`,
145		monthStart, monthEnd,
146	).Scan(&count)
147	if err != nil {
148		return fmt.Errorf("check existing aggregation: %w", err)
149	}
150
151	if count > 0 {
152		logger.Info("previous month already aggregated, skipping", "month", prevMonth.Format("2006-01"), "hosts", count)
153		return nil
154	}
155
156	logger.Info("previous month not aggregated, running now", "month", prevMonth.Format("2006-01"))
157	return RunAnalyticsAggregation(dbpool, logger, prevMonth)
158}
159
160func fetchUserHostPairs(dbpool *postgres.PsqlDB, monthStart, monthEnd time.Time) ([]userHostPair, error) {
161	rows, err := dbpool.Db.Queryx(
162		`SELECT DISTINCT user_id, host FROM analytics_visits WHERE created_at >= $1 AND created_at < $2 AND host <> '' ORDER BY user_id, host`,
163		monthStart, monthEnd,
164	)
165	if err != nil {
166		return nil, err
167	}
168	defer func() { _ = rows.Close() }()
169
170	var pairs []userHostPair
171	for rows.Next() {
172		var p userHostPair
173		if err := rows.Scan(&p.UserID, &p.Host); err != nil {
174			return nil, err
175		}
176		pairs = append(pairs, p)
177	}
178	return pairs, rows.Err()
179}
180
181func fetchVisitsForPair(dbpool *postgres.PsqlDB, userID, host string, monthStart, monthEnd time.Time) ([]visitRow, error) {
182	rows, err := dbpool.Db.Queryx(
183		`SELECT user_id, host, path, ip_address, referer, status, user_agent, created_at
184		 FROM analytics_visits
185		 WHERE user_id = $1 AND host = $2 AND created_at >= $3 AND created_at < $4`,
186		userID, host, monthStart, monthEnd,
187	)
188	if err != nil {
189		return nil, err
190	}
191	defer func() { _ = rows.Close() }()
192
193	var visits []visitRow
194	for rows.Next() {
195		var v visitRow
196		if err := rows.Scan(&v.UserID, &v.Host, &v.Path, &v.IPAddress, &v.Referer, &v.Status, &v.UserAgent, &v.CreatedAt); err != nil {
197			return nil, err
198		}
199		visits = append(visits, v)
200	}
201	return visits, rows.Err()
202}
203
204func aggregateDays(visits []visitRow) map[string]*dayStats {
205	dayMap := make(map[string]*dayStats)
206
207	for _, v := range visits {
208		// Exclude 404s from unique visitor counts — bots hitting non-existent
209		// paths shouldn't count as visitors
210		if v.Status == 404 {
211			continue
212		}
213
214		dateKey := v.CreatedAt.UTC().Format("2006-01-02")
215		ds, ok := dayMap[dateKey]
216		if !ok {
217			ds = &dayStats{
218				Date:       time.Date(v.CreatedAt.Year(), v.CreatedAt.Month(), v.CreatedAt.Day(), 0, 0, 0, 0, time.UTC),
219				AllIPs:     make(map[string]bool),
220				MobileIPs:  make(map[string]bool),
221				DesktopIPs: make(map[string]bool),
222			}
223			dayMap[dateKey] = ds
224		}
225
226		ds.AllIPs[v.IPAddress] = true
227		if isMobile(v.UserAgent) {
228			ds.MobileIPs[v.IPAddress] = true
229		} else {
230			ds.DesktopIPs[v.IPAddress] = true
231		}
232	}
233	return dayMap
234}
235
236func aggregatePaths(visits []visitRow) map[int][]pathStats {
237	// Maps status_code -> list of pathStats
238	statusMap := make(map[int]map[string]*pathStats)
239
240	for _, v := range visits {
241		if v.Path == "" {
242			continue
243		}
244
245		if _, ok := statusMap[v.Status]; !ok {
246			statusMap[v.Status] = make(map[string]*pathStats)
247		}
248
249		key := fmt.Sprintf("%s/%d", v.Path, v.Status)
250		ps, ok := statusMap[v.Status][key]
251		if !ok {
252			ps = &pathStats{Path: v.Path, Status: v.Status, IPs: make(map[string]bool)}
253			statusMap[v.Status][key] = ps
254		}
255		ps.IPs[v.IPAddress] = true
256	}
257
258	result := make(map[int][]pathStats)
259	for status, paths := range statusMap {
260		list := make([]pathStats, 0, len(paths))
261		for _, ps := range paths {
262			list = append(list, *ps)
263		}
264		sortByCount(list)
265		result[status] = list
266	}
267	return result
268}
269
270func aggregateReferers(visits []visitRow) []pathStats {
271	refMap := make(map[string]*pathStats)
272
273	for _, v := range visits {
274		if v.Referer == "" || v.Status == 404 {
275			continue
276		}
277
278		rs, ok := refMap[v.Referer]
279		if !ok {
280			rs = &pathStats{Path: v.Referer, IPs: make(map[string]bool)}
281			refMap[v.Referer] = rs
282		}
283		rs.IPs[v.IPAddress] = true
284	}
285
286	result := make([]pathStats, 0, len(refMap))
287	for _, rs := range refMap {
288		result = append(result, *rs)
289	}
290	sortByCount(result)
291	return result
292}
293
294func sortByCount(paths []pathStats) {
295	for i := 1; i < len(paths); i++ {
296		for j := i; j > 0 && len(paths[j].IPs) > len(paths[j-1].IPs); j-- {
297			paths[j], paths[j-1] = paths[j-1], paths[j]
298		}
299	}
300}
301
302func insertMonthlyVisits(dbpool *postgres.PsqlDB, userID, host string, dayMap map[string]*dayStats) error {
303	for _, ds := range dayMap {
304		_, err := dbpool.Db.Exec(
305			`INSERT INTO analytics_monthly_visits (user_id, host, visit_date, unique_visits, mobile_visits, desktop_visits)
306			 VALUES ($1, $2, $3, $4, $5, $6)
307			 ON CONFLICT (user_id, host, visit_date) DO UPDATE
308			 SET unique_visits = EXCLUDED.unique_visits,
309			     mobile_visits = EXCLUDED.mobile_visits,
310			     desktop_visits = EXCLUDED.desktop_visits`,
311			userID, host, ds.Date,
312			len(ds.AllIPs), len(ds.MobileIPs), len(ds.DesktopIPs),
313		)
314		if err != nil {
315			return fmt.Errorf("insert monthly visits for %s: %w", ds.Date.Format("2006-01-02"), err)
316		}
317	}
318	return nil
319}
320
321func insertTopURLs(dbpool *postgres.PsqlDB, userID, host string, month time.Time, statusCode int, paths []pathStats) error {
322	limit := 10
323	if statusCode == 404 {
324		limit = 100
325	}
326	if len(paths) > limit {
327		paths = paths[:limit]
328	}
329
330	for rank, ps := range paths {
331		_, err := dbpool.Db.Exec(
332			`INSERT INTO analytics_monthly_top_urls (user_id, host, month, path, unique_visits, status_code, rank)
333			 VALUES ($1, $2, $3, $4, $5, $6, $7)
334			 ON CONFLICT (user_id, host, month, path, status_code) DO UPDATE
335			 SET unique_visits = EXCLUDED.unique_visits,
336			     rank = EXCLUDED.rank`,
337			userID, host, month, ps.Path, len(ps.IPs), statusCode, rank+1,
338		)
339		if err != nil {
340			return fmt.Errorf("insert top url %s: %w", ps.Path, err)
341		}
342	}
343	return nil
344}
345
346func insertTopReferers(dbpool *postgres.PsqlDB, userID, host string, month time.Time, refs []pathStats) error {
347	limit := 10
348	if len(refs) > limit {
349		refs = refs[:limit]
350	}
351
352	for rank, rs := range refs {
353		_, err := dbpool.Db.Exec(
354			`INSERT INTO analytics_monthly_top_referers (user_id, host, month, referer, unique_visits, rank)
355			 VALUES ($1, $2, $3, $4, $5, $6)
356			 ON CONFLICT (user_id, host, month, referer) DO UPDATE
357			 SET unique_visits = EXCLUDED.unique_visits,
358			     rank = EXCLUDED.rank`,
359			userID, host, month, rs.Path, len(rs.IPs), rank+1,
360		)
361		if err != nil {
362			return fmt.Errorf("insert top referer %s: %w", rs.Path, err)
363		}
364	}
365	return nil
366}
367
368func countTotalUnique(dayMap map[string]*dayStats) int {
369	allIPs := make(map[string]bool)
370	for _, ds := range dayMap {
371		for ip := range ds.AllIPs {
372			allIPs[ip] = true
373		}
374	}
375	return len(allIPs)
376}
377
378func upsertUserSite(dbpool *postgres.PsqlDB, userID, host string, monthUnique int, month time.Time) error {
379	_, err := dbpool.Db.Exec(
380		`INSERT INTO analytics_user_sites (user_id, host, total_visits, last_seen)
381		 VALUES ($1, $2, $3, $4)
382		 ON CONFLICT (user_id, host) DO UPDATE
383		 SET total_visits = analytics_user_sites.total_visits + EXCLUDED.total_visits,
384		     last_seen = EXCLUDED.last_seen`,
385		userID, host, monthUnique, month,
386	)
387	return err
388}
389
390// isMobile checks common mobile user-agent patterns.
391// TODO: replace with a proper UA parser library (e.g., mssola/useragent).
392func isMobile(userAgent string) bool {
393	ua := strings.ToLower(userAgent)
394	mobileKeywords := []string{
395		"mobile",
396		"android",
397		"iphone",
398		"ipad",
399		"ipod",
400		"blackberry",
401		"windows phone",
402	}
403	for _, kw := range mobileKeywords {
404		if strings.Contains(ua, kw) {
405			return true
406		}
407	}
408	return false
409}
410
411// deleteAggregatedVisits deletes raw visit data from analytics_visits for the given month.
412// Data is preserved in summary tables, so this is safe regardless of feature flags.
413// Raw data for the current and previous months is never deleted since visitUniqueFromRaw
414// still reads from analytics_visits for those months.
415func deleteAggregatedVisits(dbpool *postgres.PsqlDB, logger *slog.Logger, monthStart, monthEnd time.Time) error {
416	now := time.Now()
417	currentMonthStart := time.Date(now.Year(), now.Month(), 1, 0, 0, 0, 0, time.UTC)
418	previousMonthStart := currentMonthStart.AddDate(0, -1, 0)
419
420	// Never delete raw data for the current or previous month —
421	// visitUniqueFromRaw still reads from analytics_visits for those months.
422	if !monthStart.Before(previousMonthStart) {
423		logger.Info("skipping raw visit deletion for recent month", "month_start", monthStart.Format("2006-01"))
424		return nil
425	}
426
427	result, err := dbpool.Db.Exec(`
428		DELETE FROM analytics_visits
429		WHERE created_at >= $1 AND created_at < $2`, monthStart, monthEnd)
430	if err != nil {
431		return fmt.Errorf("delete aggregated visits: %w", err)
432	}
433
434	deleted, _ := result.RowsAffected()
435	logger.Info("deleted aggregated visits", "month_start", monthStart.Format("2006-01"), "deleted", deleted)
436	return nil
437}