Eric Bower
·
2026-05-03
analytics_aggregate.go
1package auth
2
3import (
4 "context"
5 "fmt"
6 "log/slog"
7 "strings"
8 "time"
9
10 "github.com/picosh/pico/pkg/db/postgres"
11)
12
13// visitRow represents a single raw visit record for aggregation.
14type visitRow struct {
15 UserID string
16 Host string
17 Path string
18 IPAddress string
19 Referer string
20 Status int
21 UserAgent string
22 CreatedAt time.Time
23}
24
25// dayStats accumulates per-day visitor counts for a (user_id, host) pair.
26type dayStats struct {
27 Date time.Time
28 AllIPs map[string]bool
29 MobileIPs map[string]bool
30 DesktopIPs map[string]bool
31}
32
33// pathStats accumulates per-path visitor counts for a (user_id, host, status_code) triplet.
34type pathStats struct {
35 Path string
36 Status int
37 IPs map[string]bool
38}
39
40// userHostPair represents a unique (user_id, host) combination.
41type userHostPair struct {
42 UserID string
43 Host string
44}
45
46// RunAnalyticsAggregation aggregates raw visits for the given month into summary tables.
47// Exported for use by the analytics-aggregate CLI script.
48func RunAnalyticsAggregation(dbpool *postgres.PsqlDB, logger *slog.Logger, targetMonth time.Time) error {
49 monthStart := time.Date(targetMonth.Year(), targetMonth.Month(), 1, 0, 0, 0, 0, time.UTC)
50 monthEnd := monthStart.AddDate(0, 1, 0)
51
52 logger.Info("starting analytics aggregation", "month", targetMonth.Format("2006-01"), "from", monthStart, "to", monthEnd)
53
54 pairs, err := fetchUserHostPairs(dbpool, monthStart, monthEnd)
55 if err != nil {
56 return fmt.Errorf("fetch user/host pairs: %w", err)
57 }
58 if len(pairs) == 0 {
59 logger.Info("no user/host pairs to aggregate", "month", targetMonth.Format("2006-01"))
60 return nil
61 }
62 logger.Info("found user/host pairs to process", "count", len(pairs), "month", targetMonth.Format("2006-01"))
63
64 for _, pair := range pairs {
65 visits, err := fetchVisitsForPair(dbpool, pair.UserID, pair.Host, monthStart, monthEnd)
66 if err != nil {
67 logger.Error("failed to fetch visits", "err", err, "user_id", pair.UserID, "host", pair.Host)
68 continue
69 }
70
71 // Aggregate daily stats
72 dayMap := aggregateDays(visits)
73 if err := insertMonthlyVisits(dbpool, pair.UserID, pair.Host, dayMap); err != nil {
74 logger.Error("failed to insert monthly visits", "err", err)
75 continue
76 }
77
78 // Aggregate top URLs grouped by status code
79 topURLsByStatus := aggregatePaths(visits)
80 for status, paths := range topURLsByStatus {
81 if err := insertTopURLs(dbpool, pair.UserID, pair.Host, targetMonth, status, paths); err != nil {
82 logger.Error("failed to insert top URLs", "err", err, "status", status)
83 continue
84 }
85 }
86
87 // Aggregate top referers
88 topReferers := aggregateReferers(visits)
89 if err := insertTopReferers(dbpool, pair.UserID, pair.Host, targetMonth, topReferers); err != nil {
90 logger.Error("failed to insert top referers", "err", err)
91 continue
92 }
93
94 // Upsert user site
95 totalUnique := countTotalUnique(dayMap)
96 if err := upsertUserSite(dbpool, pair.UserID, pair.Host, totalUnique, targetMonth); err != nil {
97 logger.Error("failed to upsert user site", "err", err)
98 continue
99 }
100 }
101
102 // Delete raw visit data for users with analytics enabled
103 if err := deleteAggregatedVisits(dbpool, logger, monthStart, monthEnd); err != nil {
104 logger.Error("failed to delete aggregated visits", "err", err, "month", targetMonth.Format("2006-01"))
105 }
106
107 logger.Info("analytics aggregation complete", "month", targetMonth.Format("2006-01"))
108 return nil
109}
110
111// analyticsAggregationCron runs once per day to check if the previous month needs aggregation.
112func analyticsAggregationCron(ctx context.Context, dbpool *postgres.PsqlDB, logger *slog.Logger) {
113 // Check at midnight UTC every day
114 ticker := time.NewTicker(24 * time.Hour)
115 defer ticker.Stop()
116
117 // Run immediately on startup to catch up if the service was down
118 if err := checkAndRunAggregation(dbpool, logger); err != nil {
119 logger.Error("startup analytics aggregation check failed", "err", err)
120 }
121
122 for {
123 select {
124 case <-ctx.Done():
125 logger.Info("analytics aggregation cron stopped")
126 return
127 case <-ticker.C:
128 if err := checkAndRunAggregation(dbpool, logger); err != nil {
129 logger.Error("analytics aggregation check failed", "err", err)
130 }
131 }
132 }
133}
134
135// checkAndRunAggregation checks if the previous month has been aggregated and runs if needed.
136func checkAndRunAggregation(dbpool *postgres.PsqlDB, logger *slog.Logger) error {
137 prevMonth := time.Now().AddDate(0, -1, 0)
138 monthStart := time.Date(prevMonth.Year(), prevMonth.Month(), 1, 0, 0, 0, 0, time.UTC)
139 monthEnd := monthStart.AddDate(0, 1, 0)
140
141 // Check if any data exists for this month in the summary table
142 var count int
143 err := dbpool.Db.QueryRowContext(context.Background(),
144 `SELECT COUNT(DISTINCT host) FROM analytics_monthly_visits WHERE visit_date >= $1 AND visit_date < $2`,
145 monthStart, monthEnd,
146 ).Scan(&count)
147 if err != nil {
148 return fmt.Errorf("check existing aggregation: %w", err)
149 }
150
151 if count > 0 {
152 logger.Info("previous month already aggregated, skipping", "month", prevMonth.Format("2006-01"), "hosts", count)
153 return nil
154 }
155
156 logger.Info("previous month not aggregated, running now", "month", prevMonth.Format("2006-01"))
157 return RunAnalyticsAggregation(dbpool, logger, prevMonth)
158}
159
160func fetchUserHostPairs(dbpool *postgres.PsqlDB, monthStart, monthEnd time.Time) ([]userHostPair, error) {
161 rows, err := dbpool.Db.Queryx(
162 `SELECT DISTINCT user_id, host FROM analytics_visits WHERE created_at >= $1 AND created_at < $2 AND host <> '' ORDER BY user_id, host`,
163 monthStart, monthEnd,
164 )
165 if err != nil {
166 return nil, err
167 }
168 defer func() { _ = rows.Close() }()
169
170 var pairs []userHostPair
171 for rows.Next() {
172 var p userHostPair
173 if err := rows.Scan(&p.UserID, &p.Host); err != nil {
174 return nil, err
175 }
176 pairs = append(pairs, p)
177 }
178 return pairs, rows.Err()
179}
180
181func fetchVisitsForPair(dbpool *postgres.PsqlDB, userID, host string, monthStart, monthEnd time.Time) ([]visitRow, error) {
182 rows, err := dbpool.Db.Queryx(
183 `SELECT user_id, host, path, ip_address, referer, status, user_agent, created_at
184 FROM analytics_visits
185 WHERE user_id = $1 AND host = $2 AND created_at >= $3 AND created_at < $4`,
186 userID, host, monthStart, monthEnd,
187 )
188 if err != nil {
189 return nil, err
190 }
191 defer func() { _ = rows.Close() }()
192
193 var visits []visitRow
194 for rows.Next() {
195 var v visitRow
196 if err := rows.Scan(&v.UserID, &v.Host, &v.Path, &v.IPAddress, &v.Referer, &v.Status, &v.UserAgent, &v.CreatedAt); err != nil {
197 return nil, err
198 }
199 visits = append(visits, v)
200 }
201 return visits, rows.Err()
202}
203
204func aggregateDays(visits []visitRow) map[string]*dayStats {
205 dayMap := make(map[string]*dayStats)
206
207 for _, v := range visits {
208 // Exclude 404s from unique visitor counts — bots hitting non-existent
209 // paths shouldn't count as visitors
210 if v.Status == 404 {
211 continue
212 }
213
214 dateKey := v.CreatedAt.UTC().Format("2006-01-02")
215 ds, ok := dayMap[dateKey]
216 if !ok {
217 ds = &dayStats{
218 Date: time.Date(v.CreatedAt.Year(), v.CreatedAt.Month(), v.CreatedAt.Day(), 0, 0, 0, 0, time.UTC),
219 AllIPs: make(map[string]bool),
220 MobileIPs: make(map[string]bool),
221 DesktopIPs: make(map[string]bool),
222 }
223 dayMap[dateKey] = ds
224 }
225
226 ds.AllIPs[v.IPAddress] = true
227 if isMobile(v.UserAgent) {
228 ds.MobileIPs[v.IPAddress] = true
229 } else {
230 ds.DesktopIPs[v.IPAddress] = true
231 }
232 }
233 return dayMap
234}
235
236func aggregatePaths(visits []visitRow) map[int][]pathStats {
237 // Maps status_code -> list of pathStats
238 statusMap := make(map[int]map[string]*pathStats)
239
240 for _, v := range visits {
241 if v.Path == "" {
242 continue
243 }
244
245 if _, ok := statusMap[v.Status]; !ok {
246 statusMap[v.Status] = make(map[string]*pathStats)
247 }
248
249 key := fmt.Sprintf("%s/%d", v.Path, v.Status)
250 ps, ok := statusMap[v.Status][key]
251 if !ok {
252 ps = &pathStats{Path: v.Path, Status: v.Status, IPs: make(map[string]bool)}
253 statusMap[v.Status][key] = ps
254 }
255 ps.IPs[v.IPAddress] = true
256 }
257
258 result := make(map[int][]pathStats)
259 for status, paths := range statusMap {
260 list := make([]pathStats, 0, len(paths))
261 for _, ps := range paths {
262 list = append(list, *ps)
263 }
264 sortByCount(list)
265 result[status] = list
266 }
267 return result
268}
269
270func aggregateReferers(visits []visitRow) []pathStats {
271 refMap := make(map[string]*pathStats)
272
273 for _, v := range visits {
274 if v.Referer == "" || v.Status == 404 {
275 continue
276 }
277
278 rs, ok := refMap[v.Referer]
279 if !ok {
280 rs = &pathStats{Path: v.Referer, IPs: make(map[string]bool)}
281 refMap[v.Referer] = rs
282 }
283 rs.IPs[v.IPAddress] = true
284 }
285
286 result := make([]pathStats, 0, len(refMap))
287 for _, rs := range refMap {
288 result = append(result, *rs)
289 }
290 sortByCount(result)
291 return result
292}
293
294func sortByCount(paths []pathStats) {
295 for i := 1; i < len(paths); i++ {
296 for j := i; j > 0 && len(paths[j].IPs) > len(paths[j-1].IPs); j-- {
297 paths[j], paths[j-1] = paths[j-1], paths[j]
298 }
299 }
300}
301
302func insertMonthlyVisits(dbpool *postgres.PsqlDB, userID, host string, dayMap map[string]*dayStats) error {
303 for _, ds := range dayMap {
304 _, err := dbpool.Db.Exec(
305 `INSERT INTO analytics_monthly_visits (user_id, host, visit_date, unique_visits, mobile_visits, desktop_visits)
306 VALUES ($1, $2, $3, $4, $5, $6)
307 ON CONFLICT (user_id, host, visit_date) DO UPDATE
308 SET unique_visits = EXCLUDED.unique_visits,
309 mobile_visits = EXCLUDED.mobile_visits,
310 desktop_visits = EXCLUDED.desktop_visits`,
311 userID, host, ds.Date,
312 len(ds.AllIPs), len(ds.MobileIPs), len(ds.DesktopIPs),
313 )
314 if err != nil {
315 return fmt.Errorf("insert monthly visits for %s: %w", ds.Date.Format("2006-01-02"), err)
316 }
317 }
318 return nil
319}
320
321func insertTopURLs(dbpool *postgres.PsqlDB, userID, host string, month time.Time, statusCode int, paths []pathStats) error {
322 limit := 10
323 if statusCode == 404 {
324 limit = 100
325 }
326 if len(paths) > limit {
327 paths = paths[:limit]
328 }
329
330 for rank, ps := range paths {
331 _, err := dbpool.Db.Exec(
332 `INSERT INTO analytics_monthly_top_urls (user_id, host, month, path, unique_visits, status_code, rank)
333 VALUES ($1, $2, $3, $4, $5, $6, $7)
334 ON CONFLICT (user_id, host, month, path, status_code) DO UPDATE
335 SET unique_visits = EXCLUDED.unique_visits,
336 rank = EXCLUDED.rank`,
337 userID, host, month, ps.Path, len(ps.IPs), statusCode, rank+1,
338 )
339 if err != nil {
340 return fmt.Errorf("insert top url %s: %w", ps.Path, err)
341 }
342 }
343 return nil
344}
345
346func insertTopReferers(dbpool *postgres.PsqlDB, userID, host string, month time.Time, refs []pathStats) error {
347 limit := 10
348 if len(refs) > limit {
349 refs = refs[:limit]
350 }
351
352 for rank, rs := range refs {
353 _, err := dbpool.Db.Exec(
354 `INSERT INTO analytics_monthly_top_referers (user_id, host, month, referer, unique_visits, rank)
355 VALUES ($1, $2, $3, $4, $5, $6)
356 ON CONFLICT (user_id, host, month, referer) DO UPDATE
357 SET unique_visits = EXCLUDED.unique_visits,
358 rank = EXCLUDED.rank`,
359 userID, host, month, rs.Path, len(rs.IPs), rank+1,
360 )
361 if err != nil {
362 return fmt.Errorf("insert top referer %s: %w", rs.Path, err)
363 }
364 }
365 return nil
366}
367
368func countTotalUnique(dayMap map[string]*dayStats) int {
369 allIPs := make(map[string]bool)
370 for _, ds := range dayMap {
371 for ip := range ds.AllIPs {
372 allIPs[ip] = true
373 }
374 }
375 return len(allIPs)
376}
377
378func upsertUserSite(dbpool *postgres.PsqlDB, userID, host string, monthUnique int, month time.Time) error {
379 _, err := dbpool.Db.Exec(
380 `INSERT INTO analytics_user_sites (user_id, host, total_visits, last_seen)
381 VALUES ($1, $2, $3, $4)
382 ON CONFLICT (user_id, host) DO UPDATE
383 SET total_visits = analytics_user_sites.total_visits + EXCLUDED.total_visits,
384 last_seen = EXCLUDED.last_seen`,
385 userID, host, monthUnique, month,
386 )
387 return err
388}
389
390// isMobile checks common mobile user-agent patterns.
391// TODO: replace with a proper UA parser library (e.g., mssola/useragent).
392func isMobile(userAgent string) bool {
393 ua := strings.ToLower(userAgent)
394 mobileKeywords := []string{
395 "mobile",
396 "android",
397 "iphone",
398 "ipad",
399 "ipod",
400 "blackberry",
401 "windows phone",
402 }
403 for _, kw := range mobileKeywords {
404 if strings.Contains(ua, kw) {
405 return true
406 }
407 }
408 return false
409}
410
411// deleteAggregatedVisits deletes raw visit data from analytics_visits for the given month.
412// Data is preserved in summary tables, so this is safe regardless of feature flags.
413// Raw data for the current and previous months is never deleted since visitUniqueFromRaw
414// still reads from analytics_visits for those months.
415func deleteAggregatedVisits(dbpool *postgres.PsqlDB, logger *slog.Logger, monthStart, monthEnd time.Time) error {
416 now := time.Now()
417 currentMonthStart := time.Date(now.Year(), now.Month(), 1, 0, 0, 0, 0, time.UTC)
418 previousMonthStart := currentMonthStart.AddDate(0, -1, 0)
419
420 // Never delete raw data for the current or previous month —
421 // visitUniqueFromRaw still reads from analytics_visits for those months.
422 if !monthStart.Before(previousMonthStart) {
423 logger.Info("skipping raw visit deletion for recent month", "month_start", monthStart.Format("2006-01"))
424 return nil
425 }
426
427 result, err := dbpool.Db.Exec(`
428 DELETE FROM analytics_visits
429 WHERE created_at >= $1 AND created_at < $2`, monthStart, monthEnd)
430 if err != nil {
431 return fmt.Errorf("delete aggregated visits: %w", err)
432 }
433
434 deleted, _ := result.RowsAffected()
435 logger.Info("deleted aggregated visits", "month_start", monthStart.Format("2006-01"), "deleted", deleted)
436 return nil
437}