repos / pico

pico services mono repo
git clone https://github.com/picosh/pico.git

pico / pkg / db / postgres
Eric Bower  ·  2026-05-05

storage.go

   1package postgres
   2
   3import (
   4	"database/sql"
   5	"errors"
   6	"fmt"
   7	"log/slog"
   8	"math"
   9	"slices"
  10	"sort"
  11	"strings"
  12	"sync"
  13	"time"
  14
  15	"github.com/jmoiron/sqlx"
  16	_ "github.com/lib/pq"
  17	"github.com/picosh/pico/pkg/db"
  18	"github.com/picosh/pico/pkg/shared"
  19)
  20
  21// mobileUserAgentExpr is a SQL expression to detect mobile user agents.
  22const mobileUserAgentExpr = `user_agent ILIKE '%mobile%' OR user_agent ILIKE '%android%' OR user_agent ILIKE '%iphone%' OR user_agent ILIKE '%ipad%' OR user_agent ILIKE '%ipod%' OR user_agent ILIKE '%blackberry%' OR user_agent ILIKE '%windows phone%'`
  23
  24var PAGER_SIZE = 15
  25
  26var SelectPost = `
  27	posts.id, user_id, app_users.name, filename, slug, title, text, description,
  28	posts.created_at, publish_at, posts.updated_at, hidden, file_size, mime_type, shasum, data, expires_at, views`
  29
  30type PsqlDB struct {
  31	Logger *slog.Logger
  32	Db     *sqlx.DB
  33}
  34
  35type RowScanner interface {
  36	Scan(dest ...any) error
  37}
  38
  39func CreatePostWithTagsByRow(r RowScanner) (*db.Post, error) {
  40	post := &db.Post{}
  41	tagStr := ""
  42	err := r.Scan(
  43		&post.ID,
  44		&post.UserID,
  45		&post.Username,
  46		&post.Filename,
  47		&post.Slug,
  48		&post.Title,
  49		&post.Text,
  50		&post.Description,
  51		&post.CreatedAt,
  52		&post.PublishAt,
  53		&post.UpdatedAt,
  54		&post.Hidden,
  55		&post.FileSize,
  56		&post.MimeType,
  57		&post.Shasum,
  58		&post.Data,
  59		&post.ExpiresAt,
  60		&post.Views,
  61		&tagStr,
  62	)
  63	if err != nil {
  64		return nil, err
  65	}
  66
  67	tags := strings.Split(tagStr, ",")
  68	for _, tag := range tags {
  69		tg := strings.TrimSpace(tag)
  70		if tg == "" {
  71			continue
  72		}
  73		post.Tags = append(post.Tags, tg)
  74	}
  75
  76	return post, nil
  77}
  78
  79func NewDB(databaseUrl string, logger *slog.Logger) *PsqlDB {
  80	var err error
  81	d := &PsqlDB{
  82		Logger: logger,
  83	}
  84	d.Logger.Info("Connecting to postgres", "databaseUrl", databaseUrl)
  85
  86	db, err := sqlx.Connect("postgres", databaseUrl)
  87	if err != nil {
  88		d.Logger.Error(err.Error())
  89	}
  90	d.Db = db
  91	return d
  92}
  93
  94func (me *PsqlDB) RegisterUser(username, pubkey, comment string) (*db.User, error) {
  95	lowerName := strings.ToLower(username)
  96	valid, err := me.validateName(lowerName)
  97	if !valid {
  98		return nil, err
  99	}
 100
 101	tx, err := me.Db.Beginx()
 102	if err != nil {
 103		return nil, err
 104	}
 105	defer func() {
 106		_ = tx.Rollback()
 107	}()
 108
 109	var id string
 110	err = tx.QueryRow(`INSERT INTO app_users (name) VALUES($1) returning id`, lowerName).Scan(&id)
 111	if err != nil {
 112		return nil, err
 113	}
 114
 115	err = me.insertPublicKeyWithTx(id, pubkey, comment, tx)
 116	if err != nil {
 117		return nil, err
 118	}
 119
 120	err = tx.Commit()
 121	if err != nil {
 122		return nil, err
 123	}
 124
 125	return me.FindUserByKey(username, pubkey)
 126}
 127
 128func (me *PsqlDB) insertPublicKeyWithTx(userID, key, name string, tx *sqlx.Tx) error {
 129	pk, _ := me.findPublicKeyByKey(key)
 130	if pk != nil {
 131		return db.ErrPublicKeyTaken
 132	}
 133	query := `INSERT INTO public_keys (user_id, public_key, name) VALUES ($1, $2, $3)`
 134	_, err := tx.Exec(query, userID, key, name)
 135	return err
 136}
 137
 138func (me *PsqlDB) InsertPublicKey(userID, key, name string) error {
 139	pk, _ := me.findPublicKeyByKey(key)
 140	if pk != nil {
 141		return db.ErrPublicKeyTaken
 142	}
 143	query := `INSERT INTO public_keys (user_id, public_key, name) VALUES ($1, $2, $3)`
 144	_, err := me.Db.Exec(query, userID, key, name)
 145	return err
 146}
 147
 148func (me *PsqlDB) UpdatePublicKey(pubkeyID, name string) (*db.PublicKey, error) {
 149	pk, err := me.findPublicKey(pubkeyID)
 150	if err != nil {
 151		return nil, err
 152	}
 153
 154	query := `UPDATE public_keys SET name=$1 WHERE id=$2;`
 155	_, err = me.Db.Exec(query, name, pk.ID)
 156	if err != nil {
 157		return nil, err
 158	}
 159
 160	pk, err = me.findPublicKey(pubkeyID)
 161	if err != nil {
 162		return nil, err
 163	}
 164	return pk, nil
 165}
 166
 167func (me *PsqlDB) findPublicKeyByKey(key string) (*db.PublicKey, error) {
 168	var keys []*db.PublicKey
 169	rs, err := me.Db.Queryx(`SELECT id, user_id, name, public_key, created_at FROM public_keys WHERE public_key = $1`, key)
 170	if err != nil {
 171		return nil, err
 172	}
 173	defer func() { _ = rs.Close() }()
 174
 175	for rs.Next() {
 176		pk := &db.PublicKey{}
 177		err := rs.Scan(&pk.ID, &pk.UserID, &pk.Name, &pk.Key, &pk.CreatedAt)
 178		if err != nil {
 179			return nil, err
 180		}
 181
 182		keys = append(keys, pk)
 183	}
 184
 185	if rs.Err() != nil {
 186		return nil, rs.Err()
 187	}
 188
 189	if len(keys) == 0 {
 190		return nil, fmt.Errorf("pubkey not found in our database: [%s]", key)
 191	}
 192
 193	// When we run PublicKeyByKey and there are multiple public keys returned from the database
 194	// that should mean that we don't have the correct username for this public key.
 195	// When that happens we need to reject the authentication and ask the user to provide the correct
 196	// username when using ssh.  So instead of `ssh <domain>` it should be `ssh user@<domain>`
 197	if len(keys) > 1 {
 198		return nil, &db.ErrMultiplePublicKeys{}
 199	}
 200
 201	return keys[0], nil
 202}
 203
 204func (me *PsqlDB) findPublicKey(pubkeyID string) (*db.PublicKey, error) {
 205	pk := &db.PublicKey{}
 206	err := me.Db.Get(pk, `SELECT * FROM public_keys WHERE id = $1`, pubkeyID)
 207	if err != nil {
 208		return nil, err
 209	}
 210	return pk, nil
 211}
 212
 213func (me *PsqlDB) FindKeysByUser(user *db.User) ([]*db.PublicKey, error) {
 214	var keys []*db.PublicKey
 215	err := me.Db.Select(&keys, `SELECT * FROM public_keys WHERE user_id = $1 ORDER BY created_at ASC`, user.ID)
 216	if err != nil {
 217		return nil, err
 218	}
 219	return keys, nil
 220}
 221
 222func (me *PsqlDB) RemoveKeys(keyIDs []string) error {
 223	param := "{" + strings.Join(keyIDs, ",") + "}"
 224	_, err := me.Db.Exec(`DELETE FROM public_keys WHERE id = ANY($1::uuid[])`, param)
 225	return err
 226}
 227
 228func (me *PsqlDB) FindUsersWithPost(space string) ([]*db.User, error) {
 229	var users []*db.User
 230	rs, err := me.Db.Queryx(
 231		`SELECT u.id, u.name, u.created_at
 232		FROM app_users u
 233		INNER JOIN posts ON u.id=posts.user_id
 234		WHERE cur_space='feeds'
 235		GROUP BY u.id, u.name, u.created_at
 236		ORDER BY name ASC`,
 237	)
 238	if err != nil {
 239		return users, err
 240	}
 241	defer func() { _ = rs.Close() }()
 242	for rs.Next() {
 243		var name sql.NullString
 244		user := &db.User{}
 245		err := rs.Scan(
 246			&user.ID,
 247			&name,
 248			&user.CreatedAt,
 249		)
 250		if err != nil {
 251			return users, err
 252		}
 253		user.Name = name.String
 254
 255		users = append(users, user)
 256	}
 257	if rs.Err() != nil {
 258		return users, rs.Err()
 259	}
 260	return users, nil
 261}
 262
 263func (me *PsqlDB) FindUserByKey(username string, key string) (*db.User, error) {
 264	me.Logger.Info("attempting to find user with only public key", "key", key)
 265	pk, err := me.findPublicKeyByKey(key)
 266	if err == nil {
 267		me.Logger.Info("found pubkey, looking for user", "key", key, "userId", pk.UserID)
 268		user, err := me.FindUser(pk.UserID)
 269		if err != nil {
 270			return nil, err
 271		}
 272		user.PublicKey = pk
 273		return user, nil
 274	}
 275
 276	if errors.Is(err, &db.ErrMultiplePublicKeys{}) {
 277		me.Logger.Info("detected multiple users with same public key", "user", username)
 278		user, err := me.findUserForNameAndKey(username, key)
 279		if err != nil {
 280			me.Logger.Info("could not find user by username and public key", "user", username, "key", key)
 281			// this is a little hacky but if we cannot find a user by name and public key
 282			// then we return the multiple keys detected error so the user knows to specify their
 283			// when logging in
 284			return nil, &db.ErrMultiplePublicKeys{}
 285		}
 286		return user, nil
 287	}
 288
 289	return nil, err
 290}
 291
 292func (me *PsqlDB) FindUserByPubkey(key string) (*db.User, error) {
 293	me.Logger.Info("attempting to find user with only public key", "key", key)
 294	pk, err := me.findPublicKeyByKey(key)
 295	if err != nil {
 296		return nil, err
 297	}
 298
 299	me.Logger.Info("found pubkey, looking for user", "key", key, "userId", pk.UserID)
 300	user, err := me.FindUser(pk.UserID)
 301	if err != nil {
 302		return nil, err
 303	}
 304	user.PublicKey = pk
 305	return user, nil
 306}
 307
 308func (me *PsqlDB) FindUser(userID string) (*db.User, error) {
 309	user := &db.User{}
 310	err := me.Db.Get(user, `SELECT id, COALESCE(name, '') as name, created_at FROM app_users WHERE id = $1`, userID)
 311	if err != nil {
 312		return nil, err
 313	}
 314	return user, nil
 315}
 316
 317func (me *PsqlDB) validateName(name string) (bool, error) {
 318	lower := strings.ToLower(name)
 319	if slices.Contains(db.DenyList, lower) {
 320		return false, fmt.Errorf("%s is on deny list: %w", lower, db.ErrNameDenied)
 321	}
 322	v := db.NameValidator.MatchString(lower)
 323	if !v {
 324		return false, fmt.Errorf("%s is invalid: %w", lower, db.ErrNameInvalid)
 325	}
 326	user, _ := me.FindUserByName(lower)
 327	if user == nil {
 328		return true, nil
 329	}
 330	return false, fmt.Errorf("%s already taken: %w", lower, db.ErrNameTaken)
 331}
 332
 333func (me *PsqlDB) FindUserByName(name string) (*db.User, error) {
 334	user := &db.User{}
 335	err := me.Db.Get(user, `SELECT * FROM app_users WHERE name = $1`, strings.ToLower(name))
 336	if err != nil {
 337		return nil, err
 338	}
 339	return user, nil
 340}
 341
 342func (me *PsqlDB) findUserForNameAndKey(name string, key string) (*db.User, error) {
 343	user := &db.User{}
 344	pk := &db.PublicKey{}
 345
 346	r := me.Db.QueryRow(`SELECT app_users.id, app_users.name, app_users.created_at, public_keys.id as pk_id, public_keys.public_key, public_keys.created_at as pk_created_at FROM app_users LEFT JOIN public_keys ON public_keys.user_id = app_users.id WHERE app_users.name = $1 AND public_keys.public_key = $2`, strings.ToLower(name), key)
 347	err := r.Scan(&user.ID, &user.Name, &user.CreatedAt, &pk.ID, &pk.Key, &pk.CreatedAt)
 348	if err != nil {
 349		return nil, err
 350	}
 351
 352	user.PublicKey = pk
 353	return user, nil
 354}
 355
 356func (me *PsqlDB) FindUserByToken(token string) (*db.User, error) {
 357	user := &db.User{}
 358	err := me.Db.Get(user, `
 359	SELECT app_users.id, app_users.name, app_users.created_at
 360	FROM app_users
 361	LEFT JOIN tokens ON tokens.user_id = app_users.id
 362	WHERE tokens.token = $1 AND tokens.expires_at > NOW()`, token)
 363	if err != nil {
 364		return nil, err
 365	}
 366	return user, nil
 367}
 368
 369func (me *PsqlDB) FindPostWithFilename(filename string, persona_id string, space string) (*db.Post, error) {
 370	query := fmt.Sprintf(`
 371	SELECT %s, STRING_AGG(coalesce(post_tags.name, ''), ',') tags
 372	FROM posts
 373	LEFT JOIN app_users ON app_users.id = posts.user_id
 374	LEFT JOIN post_tags ON post_tags.post_id = posts.id
 375	WHERE filename = $1 AND user_id = $2 AND cur_space = $3
 376	GROUP BY %s`, SelectPost, SelectPost)
 377	r := me.Db.QueryRow(query, filename, persona_id, space)
 378	post, err := CreatePostWithTagsByRow(r)
 379	if err != nil {
 380		return nil, err
 381	}
 382
 383	return post, nil
 384}
 385
 386func (me *PsqlDB) FindPostWithSlug(slug string, user_id string, space string) (*db.Post, error) {
 387	query := fmt.Sprintf(`
 388	SELECT %s, STRING_AGG(coalesce(post_tags.name, ''), ',') tags
 389	FROM posts
 390	LEFT JOIN app_users ON app_users.id = posts.user_id
 391	LEFT JOIN post_tags ON post_tags.post_id = posts.id
 392	WHERE slug = $1 AND user_id = $2 AND cur_space = $3
 393	GROUP BY %s`, SelectPost, SelectPost)
 394	r := me.Db.QueryRow(query, slug, user_id, space)
 395	post, err := CreatePostWithTagsByRow(r)
 396	if err != nil {
 397		// attempt to find post inside post_aliases
 398		alias := me.Db.QueryRow(
 399			`SELECT post_aliases.post_id FROM post_aliases
 400			INNER JOIN posts ON posts.id = post_aliases.post_id
 401			WHERE post_aliases.slug = $1 AND posts.user_id = $2`,
 402			slug, user_id,
 403		)
 404		postID := ""
 405		err := alias.Scan(&postID)
 406		if err != nil {
 407			return nil, err
 408		}
 409
 410		return me.FindPost(postID)
 411	}
 412
 413	return post, nil
 414}
 415
 416func (me *PsqlDB) FindPost(postID string) (*db.Post, error) {
 417	post := &db.Post{}
 418	query := fmt.Sprintf(`
 419	SELECT %s
 420	FROM posts
 421	LEFT JOIN app_users ON app_users.id = posts.user_id
 422	WHERE posts.id = $1`, SelectPost)
 423	err := me.Db.Get(post, query, postID)
 424	if err != nil {
 425		return nil, err
 426	}
 427	return post, nil
 428}
 429
 430func (me *PsqlDB) postPager(rs *sqlx.Rows, pageNum int, space string, tag string) (*db.Paginate[*db.Post], error) {
 431	var posts []*db.Post
 432	for rs.Next() {
 433		post := &db.Post{}
 434		err := rs.Scan(
 435			&post.ID,
 436			&post.UserID,
 437			&post.Filename,
 438			&post.Slug,
 439			&post.Title,
 440			&post.Text,
 441			&post.Description,
 442			&post.PublishAt,
 443			&post.Username,
 444			&post.UpdatedAt,
 445			&post.MimeType,
 446		)
 447		if err != nil {
 448			return nil, err
 449		}
 450
 451		posts = append(posts, post)
 452	}
 453	if rs.Err() != nil {
 454		return nil, rs.Err()
 455	}
 456
 457	var count int
 458	var err error
 459	if tag == "" {
 460		err = me.Db.QueryRow(`SELECT count(id) FROM posts WHERE hidden = FALSE AND cur_space=$1`, space).Scan(&count)
 461	} else {
 462		err = me.Db.QueryRow(`
 463	SELECT count(posts.id)
 464	FROM posts
 465	LEFT JOIN post_tags ON post_tags.post_id = posts.id
 466	WHERE hidden = FALSE AND cur_space=$1 and post_tags.name = $2`, space, tag).Scan(&count)
 467	}
 468	if err != nil {
 469		return nil, err
 470	}
 471
 472	pager := &db.Paginate[*db.Post]{
 473		Data:  posts,
 474		Total: int(math.Ceil(float64(count) / float64(pageNum))),
 475	}
 476
 477	return pager, nil
 478}
 479
 480func (me *PsqlDB) FindPostsByFeed(page *db.Pager, space string) (*db.Paginate[*db.Post], error) {
 481	query := `
 482	SELECT *
 483	FROM (
 484	    SELECT DISTINCT ON (posts.user_id)
 485	        posts.id,
 486	        posts.user_id,
 487	        posts.filename,
 488	        posts.slug,
 489	        posts.title,
 490	        posts.text,
 491	        posts.description,
 492	        posts.publish_at,
 493	        app_users.name AS username,
 494	        posts.updated_at,
 495	        posts.mime_type
 496	    FROM posts
 497	    LEFT JOIN app_users ON app_users.id = posts.user_id
 498	    WHERE
 499	        hidden = FALSE
 500	        AND publish_at::date <= CURRENT_DATE
 501	        AND cur_space = $3
 502	    ORDER BY posts.user_id, publish_at DESC
 503	) AS latest_posts
 504	ORDER BY publish_at DESC
 505	LIMIT $1 OFFSET $2`
 506	rs, err := me.Db.Queryx(query, page.Num, page.Num*page.Page, space)
 507	if err != nil {
 508		return nil, err
 509	}
 510	defer func() { _ = rs.Close() }()
 511	return me.postPager(rs, page.Num, space, "")
 512}
 513
 514func (me *PsqlDB) InsertPost(post *db.Post) (*db.Post, error) {
 515	var id string
 516	query := `
 517	INSERT INTO posts
 518		(user_id, filename, slug, title, text, description, publish_at, hidden, cur_space,
 519		file_size, mime_type, shasum, data, expires_at, updated_at)
 520	VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15)
 521	RETURNING id`
 522	err := me.Db.QueryRow(
 523		query,
 524		post.UserID,
 525		post.Filename,
 526		post.Slug,
 527		post.Title,
 528		post.Text,
 529		post.Description,
 530		post.PublishAt,
 531		post.Hidden,
 532		post.Space,
 533		post.FileSize,
 534		post.MimeType,
 535		post.Shasum,
 536		post.Data,
 537		post.ExpiresAt,
 538		post.UpdatedAt,
 539	).Scan(&id)
 540	if err != nil {
 541		return nil, err
 542	}
 543
 544	return me.FindPost(id)
 545}
 546
 547func (me *PsqlDB) UpdatePost(post *db.Post) (*db.Post, error) {
 548	query := `
 549	UPDATE posts
 550	SET slug = $1, title = $2, text = $3, description = $4, updated_at = $5, publish_at = $6,
 551		file_size = $7, shasum = $8, data = $9, hidden = $11, expires_at = $12
 552	WHERE id = $10`
 553	_, err := me.Db.Exec(
 554		query,
 555		post.Slug,
 556		post.Title,
 557		post.Text,
 558		post.Description,
 559		post.UpdatedAt,
 560		post.PublishAt,
 561		post.FileSize,
 562		post.Shasum,
 563		post.Data,
 564		post.ID,
 565		post.Hidden,
 566		post.ExpiresAt,
 567	)
 568	if err != nil {
 569		return nil, err
 570	}
 571
 572	return me.FindPost(post.ID)
 573}
 574
 575func (me *PsqlDB) RemovePosts(postIDs []string) error {
 576	param := "{" + strings.Join(postIDs, ",") + "}"
 577	_, err := me.Db.Exec(`DELETE FROM posts WHERE id = ANY($1::uuid[])`, param)
 578	return err
 579}
 580
 581func (me *PsqlDB) FindPostsByUser(page *db.Pager, userID string, space string) (*db.Paginate[*db.Post], error) {
 582	var posts []*db.Post
 583	query := fmt.Sprintf(`
 584	SELECT %s, STRING_AGG(coalesce(post_tags.name, ''), ',') tags
 585	FROM posts
 586	LEFT JOIN app_users ON app_users.id = posts.user_id
 587	LEFT JOIN post_tags ON post_tags.post_id = posts.id
 588	WHERE
 589		hidden = FALSE AND
 590		user_id = $1 AND
 591		publish_at::date <= CURRENT_DATE AND
 592		cur_space = $2
 593	GROUP BY %s
 594	ORDER BY publish_at DESC, slug DESC
 595	LIMIT $3 OFFSET $4`, SelectPost, SelectPost)
 596	rs, err := me.Db.Queryx(
 597		query,
 598		userID,
 599		space,
 600		page.Num,
 601		page.Num*page.Page,
 602	)
 603	if err != nil {
 604		return nil, err
 605	}
 606	defer func() { _ = rs.Close() }()
 607	for rs.Next() {
 608		post, err := CreatePostWithTagsByRow(rs)
 609		if err != nil {
 610			return nil, err
 611		}
 612
 613		posts = append(posts, post)
 614	}
 615
 616	if rs.Err() != nil {
 617		return nil, rs.Err()
 618	}
 619
 620	var count int
 621	err = me.Db.QueryRow(`SELECT count(id) FROM posts WHERE hidden = FALSE AND cur_space=$1`, space).Scan(&count)
 622	if err != nil {
 623		return nil, err
 624	}
 625
 626	pager := &db.Paginate[*db.Post]{
 627		Data:  posts,
 628		Total: int(math.Ceil(float64(count) / float64(page.Num))),
 629	}
 630	return pager, nil
 631}
 632
 633func (me *PsqlDB) FindAllPostsByUser(userID string, space string) ([]*db.Post, error) {
 634	var posts []*db.Post
 635	query := fmt.Sprintf(`
 636	SELECT %s
 637	FROM posts
 638	LEFT JOIN app_users ON app_users.id = posts.user_id
 639	WHERE
 640		user_id = $1 AND
 641		cur_space = $2
 642	ORDER BY publish_at DESC`, SelectPost)
 643	err := me.Db.Select(&posts, query, userID, space)
 644	if err != nil {
 645		return nil, err
 646	}
 647	return posts, nil
 648}
 649
 650func (me *PsqlDB) FindPosts() ([]*db.Post, error) {
 651	var posts []*db.Post
 652	query := fmt.Sprintf(`
 653	SELECT %s
 654	FROM posts
 655	LEFT JOIN app_users ON app_users.id = posts.user_id`, SelectPost)
 656	err := me.Db.Select(&posts, query)
 657	if err != nil {
 658		return nil, err
 659	}
 660	return posts, nil
 661}
 662
 663func (me *PsqlDB) FindExpiredPosts(space string) ([]*db.Post, error) {
 664	var posts []*db.Post
 665	query := fmt.Sprintf(`
 666		SELECT %s
 667		FROM posts
 668		LEFT JOIN app_users ON app_users.id = posts.user_id
 669		WHERE
 670			cur_space = $1 AND
 671			expires_at <= now();
 672	`, SelectPost)
 673	err := me.Db.Select(&posts, query, space)
 674	if err != nil {
 675		return nil, err
 676	}
 677	return posts, nil
 678}
 679
 680func (me *PsqlDB) Close() error {
 681	me.Logger.Info("Closing db")
 682	return me.Db.Close()
 683}
 684
 685func newNullString(s string) sql.NullString {
 686	if len(s) == 0 {
 687		return sql.NullString{}
 688	}
 689	return sql.NullString{
 690		String: s,
 691		Valid:  true,
 692	}
 693}
 694
 695func (me *PsqlDB) InsertVisit(visit *db.AnalyticsVisits) error {
 696	_, err := me.Db.Exec(
 697		`INSERT INTO analytics_visits (user_id, project_id, post_id, namespace, host, path, ip_address, user_agent, referer, status, content_type) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11);`,
 698		visit.UserID,
 699		newNullString(visit.ProjectID),
 700		newNullString(visit.PostID),
 701		newNullString(visit.Namespace),
 702		visit.Host,
 703		visit.Path,
 704		visit.IpAddress,
 705		visit.UserAgent,
 706		visit.Referer,
 707		visit.Status,
 708		visit.ContentType,
 709	)
 710	return err
 711}
 712
 713func visitFilterBy(opts *db.SummaryOpts) (string, string) {
 714	where := ""
 715	val := ""
 716	if opts.Host != "" {
 717		where = "host"
 718		val = opts.Host
 719	} else if opts.Path != "" {
 720		where = "path"
 721		val = opts.Path
 722	}
 723
 724	return where, val
 725}
 726
 727func (me *PsqlDB) visitUnique(opts *db.SummaryOpts) ([]*db.VisitInterval, error) {
 728	var intervals, currentIntervals []*db.VisitInterval
 729	var sumErr, rawErr error
 730
 731	var wg sync.WaitGroup
 732	wg.Add(2)
 733
 734	go func() {
 735		defer wg.Done()
 736		intervals, sumErr = me.visitUniqueFromSummary(opts)
 737	}()
 738	go func() {
 739		defer wg.Done()
 740		currentIntervals, rawErr = me.visitUniqueFromRaw(opts)
 741	}()
 742
 743	wg.Wait()
 744
 745	if sumErr != nil {
 746		return nil, fmt.Errorf("query summary visits: %w", sumErr)
 747	}
 748	if rawErr != nil {
 749		return nil, fmt.Errorf("query raw visits: %w", rawErr)
 750	}
 751
 752	// Merge: current month data may overlap with summary data, combine counts
 753	return mergeVisitIntervals(intervals, currentIntervals), nil
 754}
 755
 756// visitUniqueFromSummary reads unique visitor counts from analytics_monthly_visits for historical data.
 757func (me *PsqlDB) visitUniqueFromSummary(opts *db.SummaryOpts) ([]*db.VisitInterval, error) {
 758	now := time.Now()
 759	currentMonthStart := time.Date(now.Year(), now.Month(), 1, 0, 0, 0, 0, time.UTC)
 760	previousMonthStart := currentMonthStart.AddDate(0, -1, 0)
 761
 762	// If origin is in the previous month or later, raw data covers it — no summary to fetch.
 763	if !opts.Origin.Before(previousMonthStart) {
 764		return nil, nil
 765	}
 766
 767	where := ""
 768	args := []interface{}{opts.UserID, opts.Origin, currentMonthStart}
 769	argIdx := 4
 770	if opts.Host != "" {
 771		where = "AND host = $" + fmt.Sprintf("%d", argIdx)
 772		args = append(args, opts.Host)
 773	}
 774
 775	query := fmt.Sprintf(`
 776		SELECT
 777			date_trunc('%s', visit_date)::timestamptz as interval_start,
 778			sum(unique_visits) as unique_visitors,
 779			sum(mobile_visits) as mobile_visits,
 780			sum(desktop_visits) as desktop_visits
 781		FROM analytics_monthly_visits
 782		WHERE user_id = $1 AND visit_date >= $2 AND visit_date < $3 %s
 783		GROUP BY interval_start
 784		ORDER BY interval_start`, opts.Interval, where)
 785
 786	rows, err := me.Db.Queryx(query, args...)
 787	if err != nil {
 788		return nil, err
 789	}
 790	defer func() { _ = rows.Close() }()
 791
 792	var intervals []*db.VisitInterval
 793	for rows.Next() {
 794		interval := &db.VisitInterval{}
 795		if err := rows.Scan(&interval.Interval, &interval.Visitors, &interval.MobileVisitors, &interval.DesktopVisitors); err != nil {
 796			return nil, err
 797		}
 798		intervals = append(intervals, interval)
 799	}
 800	return intervals, rows.Err()
 801}
 802
 803// visitUniqueFromRaw reads unique visitor counts from analytics_visits for the previous and current months.
 804// This covers the gap between the last aggregated month and the current month.
 805func (me *PsqlDB) visitUniqueFromRaw(opts *db.SummaryOpts) ([]*db.VisitInterval, error) {
 806	now := time.Now()
 807	currentMonthStart := time.Date(now.Year(), now.Month(), 1, 0, 0, 0, 0, time.UTC)
 808	previousMonthStart := currentMonthStart.AddDate(0, -1, 0)
 809
 810	where, with := visitFilterBy(opts)
 811
 812	// Determine the effective start: max(origin, previousMonthStart)
 813	effectiveStart := previousMonthStart
 814	if opts.Origin.After(previousMonthStart) {
 815		effectiveStart = opts.Origin
 816	}
 817
 818	uniqueVisitors := fmt.Sprintf(`
 819		SELECT
 820			date_trunc('%s', created_at)::timestamptz as interval_start,
 821			count(DISTINCT CASE WHEN %s THEN ip_address END) as mobile_visitors,
 822			count(DISTINCT CASE WHEN NOT %s THEN ip_address END) as desktop_visitors,
 823			count(DISTINCT ip_address) as unique_visitors
 824		FROM analytics_visits
 825		WHERE created_at >= $1 AND created_at < $2 AND %s = $3 AND user_id = $4 AND status <> 404
 826		GROUP BY interval_start
 827		ORDER BY interval_start`,
 828		opts.Interval, mobileUserAgentExpr, mobileUserAgentExpr, where)
 829
 830	rows, err := me.Db.Queryx(uniqueVisitors, effectiveStart, currentMonthStart.AddDate(0, 1, 0), with, opts.UserID)
 831	if err != nil {
 832		return nil, err
 833	}
 834	defer func() { _ = rows.Close() }()
 835
 836	var intervals []*db.VisitInterval
 837	for rows.Next() {
 838		interval := &db.VisitInterval{}
 839		if err := rows.Scan(&interval.Interval, &interval.MobileVisitors, &interval.DesktopVisitors, &interval.Visitors); err != nil {
 840			return nil, err
 841		}
 842		intervals = append(intervals, interval)
 843	}
 844	return intervals, rows.Err()
 845}
 846
 847// mergeVisitIntervals combines historical (summary table) and current (raw) intervals.
 848// Summary data is preferred when both sources have the same interval to avoid double-counting.
 849func mergeVisitIntervals(historical, current []*db.VisitInterval) []*db.VisitInterval {
 850	if len(historical) == 0 {
 851		return current
 852	}
 853	if len(current) == 0 {
 854		return historical
 855	}
 856
 857	// Build a map by interval timestamp for merging.
 858	// Summary data takes precedence over raw data for the same interval
 859	// (e.g. when a month is aggregated but raw data still exists in a local dump).
 860	intervalMap := make(map[int64]*db.VisitInterval)
 861	for _, ci := range current {
 862		ts := ci.Interval.Unix()
 863		intervalMap[ts] = ci
 864	}
 865	for _, hi := range historical {
 866		ts := hi.Interval.Unix()
 867		intervalMap[ts] = hi // summary overwrites raw if both exist
 868	}
 869
 870	// Sort by interval
 871	result := make([]*db.VisitInterval, 0, len(intervalMap))
 872	for _, iv := range intervalMap {
 873		result = append(result, iv)
 874	}
 875	sort.Slice(result, func(i, j int) bool {
 876		return result[i].Interval.Before(*result[j].Interval)
 877	})
 878	return result
 879}
 880
 881// mergeTopUrls combines historical and current top URLs, summing counts for overlapping URLs,
 882// then returns the top 10 by total count.
 883func mergeTopUrls(historical, current []*db.VisitUrl) []*db.VisitUrl {
 884	if len(historical) == 0 {
 885		return current
 886	}
 887	if len(current) == 0 {
 888		return historical
 889	}
 890
 891	// Build a map by URL for merging
 892	urlMap := make(map[string]*db.VisitUrl)
 893	for _, hu := range historical {
 894		urlMap[hu.Url] = hu
 895	}
 896
 897	for _, cu := range current {
 898		if existing, ok := urlMap[cu.Url]; ok {
 899			existing.Count += cu.Count
 900		} else {
 901			urlMap[cu.Url] = cu
 902		}
 903	}
 904
 905	// Sort by count descending and take top 10
 906	result := make([]*db.VisitUrl, 0, len(urlMap))
 907	for _, u := range urlMap {
 908		result = append(result, u)
 909	}
 910	sort.Slice(result, func(i, j int) bool {
 911		return result[i].Count > result[j].Count
 912	})
 913	if len(result) > 10 {
 914		result = result[:10]
 915	}
 916	return result
 917}
 918
 919// mergeTopReferers combines historical and current top referers, summing counts for overlapping referers,
 920// then returns the top 10 by total count.
 921func mergeTopReferers(historical, current []*db.VisitUrl) []*db.VisitUrl {
 922	return mergeTopUrls(historical, current) // Same logic as mergeTopUrls
 923}
 924
 925// mergeHosts combines historical and current hosts, summing counts for overlapping hosts,
 926// then returns sorted by total count descending.
 927func mergeHosts(historical, current []*db.VisitUrl) []*db.VisitUrl {
 928	if len(historical) == 0 {
 929		return current
 930	}
 931	if len(current) == 0 {
 932		return historical
 933	}
 934
 935	// Build a map by host for merging
 936	hostMap := make(map[string]*db.VisitUrl)
 937	for _, hu := range historical {
 938		hostMap[hu.Url] = hu
 939	}
 940
 941	for _, cu := range current {
 942		if existing, ok := hostMap[cu.Url]; ok {
 943			existing.Count += cu.Count
 944		} else {
 945			hostMap[cu.Url] = cu
 946		}
 947	}
 948
 949	// Sort by count descending
 950	result := make([]*db.VisitUrl, 0, len(hostMap))
 951	for _, h := range hostMap {
 952		result = append(result, h)
 953	}
 954	sort.Slice(result, func(i, j int) bool {
 955		return result[i].Count > result[j].Count
 956	})
 957	return result
 958}
 959
 960func (me *PsqlDB) visitReferer(opts *db.SummaryOpts) ([]*db.VisitUrl, error) {
 961	var historical, current []*db.VisitUrl
 962	var histErr, rawErr error
 963
 964	var wg sync.WaitGroup
 965	wg.Add(2)
 966
 967	go func() {
 968		defer wg.Done()
 969		historical, histErr = me.visitRefererFromSummary(opts)
 970	}()
 971	go func() {
 972		defer wg.Done()
 973		current, rawErr = me.visitRefererFromRaw(opts)
 974	}()
 975
 976	wg.Wait()
 977
 978	if histErr != nil {
 979		return nil, fmt.Errorf("query summary referers: %w", histErr)
 980	}
 981	if rawErr != nil {
 982		return nil, fmt.Errorf("query raw referers: %w", rawErr)
 983	}
 984
 985	return mergeTopReferers(historical, current), nil
 986}
 987
 988// visitRefererFromSummary reads top referers from analytics_monthly_top_referers for historical data.
 989func (me *PsqlDB) visitRefererFromSummary(opts *db.SummaryOpts) ([]*db.VisitUrl, error) {
 990	now := time.Now()
 991	currentMonthStart := time.Date(now.Year(), now.Month(), 1, 0, 0, 0, 0, time.UTC)
 992	previousMonthStart := currentMonthStart.AddDate(0, -1, 0)
 993
 994	// If origin is in the previous month or later, raw data covers it — no summary to fetch.
 995	if !opts.Origin.Before(previousMonthStart) {
 996		return nil, nil
 997	}
 998
 999	// Clamp origin to month boundary for summary table lookup
1000	originMonthStart := time.Date(opts.Origin.Year(), opts.Origin.Month(), 1, 0, 0, 0, 0, time.UTC)
1001
1002	where := ""
1003	args := []interface{}{opts.UserID, originMonthStart, currentMonthStart}
1004	if opts.Host != "" {
1005		where = "AND host = $4"
1006		args = append(args, opts.Host)
1007	}
1008
1009	query := fmt.Sprintf(`
1010		SELECT referer, sum(unique_visits) as total_visits
1011		FROM analytics_monthly_top_referers
1012		WHERE user_id = $1 AND month >= $2 AND month < $3 %s
1013		GROUP BY referer
1014		ORDER BY total_visits DESC
1015		LIMIT 10`, where)
1016
1017	rows, err := me.Db.Queryx(query, args...)
1018	if err != nil {
1019		return nil, err
1020	}
1021	defer func() { _ = rows.Close() }()
1022
1023	var results []*db.VisitUrl
1024	for rows.Next() {
1025		result := &db.VisitUrl{}
1026		if err := rows.Scan(&result.Url, &result.Count); err != nil {
1027			return nil, err
1028		}
1029		results = append(results, result)
1030	}
1031	return results, rows.Err()
1032}
1033
1034// visitRefererFromRaw reads top referers from analytics_visits for the previous and current months.
1035func (me *PsqlDB) visitRefererFromRaw(opts *db.SummaryOpts) ([]*db.VisitUrl, error) {
1036	now := time.Now()
1037	currentMonthStart := time.Date(now.Year(), now.Month(), 1, 0, 0, 0, 0, time.UTC)
1038	previousMonthStart := currentMonthStart.AddDate(0, -1, 0)
1039
1040	where, with := visitFilterBy(opts)
1041
1042	// Determine the effective start: max(origin, previousMonthStart)
1043	effectiveStart := previousMonthStart
1044	if opts.Origin.After(previousMonthStart) {
1045		effectiveStart = opts.Origin
1046	}
1047
1048	topUrls := fmt.Sprintf(`
1049		SELECT
1050			referer,
1051			count(DISTINCT ip_address) as referer_count
1052		FROM analytics_visits
1053		WHERE created_at >= $1 AND created_at < $2 AND %s = $3 AND user_id = $4 AND referer <> '' AND status <> 404
1054		GROUP BY referer
1055		ORDER BY referer_count DESC
1056		LIMIT 10`, where)
1057
1058	rows, err := me.Db.Queryx(topUrls, effectiveStart, currentMonthStart.AddDate(0, 1, 0), with, opts.UserID)
1059	if err != nil {
1060		return nil, err
1061	}
1062	defer func() { _ = rows.Close() }()
1063
1064	var results []*db.VisitUrl
1065	for rows.Next() {
1066		result := &db.VisitUrl{}
1067		if err := rows.Scan(&result.Url, &result.Count); err != nil {
1068			return nil, err
1069		}
1070		results = append(results, result)
1071	}
1072	return results, rows.Err()
1073}
1074
1075func (me *PsqlDB) visitUrl(opts *db.SummaryOpts) ([]*db.VisitUrl, error) {
1076	var historical, current []*db.VisitUrl
1077	var histErr, rawErr error
1078
1079	var wg sync.WaitGroup
1080	wg.Add(2)
1081
1082	go func() {
1083		defer wg.Done()
1084		historical, histErr = me.visitUrlFromSummary(opts)
1085	}()
1086	go func() {
1087		defer wg.Done()
1088		current, rawErr = me.visitUrlFromRaw(opts)
1089	}()
1090
1091	wg.Wait()
1092
1093	if histErr != nil {
1094		return nil, fmt.Errorf("query summary urls: %w", histErr)
1095	}
1096	if rawErr != nil {
1097		return nil, fmt.Errorf("query raw urls: %w", rawErr)
1098	}
1099
1100	return mergeTopUrls(historical, current), nil
1101}
1102
1103// visitUrlFromSummary reads top URLs from analytics_monthly_top_urls for historical data.
1104func (me *PsqlDB) visitUrlFromSummary(opts *db.SummaryOpts) ([]*db.VisitUrl, error) {
1105	now := time.Now()
1106	currentMonthStart := time.Date(now.Year(), now.Month(), 1, 0, 0, 0, 0, time.UTC)
1107	previousMonthStart := currentMonthStart.AddDate(0, -1, 0)
1108
1109	// If origin is in the previous month or later, raw data covers it — no summary to fetch.
1110	if !opts.Origin.Before(previousMonthStart) {
1111		return nil, nil
1112	}
1113
1114	// Clamp origin to month boundary for summary table lookup
1115	originMonthStart := time.Date(opts.Origin.Year(), opts.Origin.Month(), 1, 0, 0, 0, 0, time.UTC)
1116
1117	where := ""
1118	args := []interface{}{opts.UserID, originMonthStart, currentMonthStart}
1119	if opts.Host != "" {
1120		where = "AND host = $4"
1121		args = append(args, opts.Host)
1122	}
1123
1124	query := fmt.Sprintf(`
1125		SELECT path, sum(unique_visits) as total_visits
1126		FROM analytics_monthly_top_urls
1127		WHERE user_id = $1 AND month >= $2 AND month < $3 AND status_code <> 404 %s
1128		GROUP BY path
1129		ORDER BY total_visits DESC
1130		LIMIT 10`, where)
1131
1132	rows, err := me.Db.Queryx(query, args...)
1133	if err != nil {
1134		return nil, err
1135	}
1136	defer func() { _ = rows.Close() }()
1137
1138	var results []*db.VisitUrl
1139	for rows.Next() {
1140		result := &db.VisitUrl{}
1141		if err := rows.Scan(&result.Url, &result.Count); err != nil {
1142			return nil, err
1143		}
1144		results = append(results, result)
1145	}
1146	return results, rows.Err()
1147}
1148
1149// visitUrlFromRaw reads top URLs from analytics_visits for the previous and current months.
1150func (me *PsqlDB) visitUrlFromRaw(opts *db.SummaryOpts) ([]*db.VisitUrl, error) {
1151	now := time.Now()
1152	currentMonthStart := time.Date(now.Year(), now.Month(), 1, 0, 0, 0, 0, time.UTC)
1153	previousMonthStart := currentMonthStart.AddDate(0, -1, 0)
1154
1155	where, with := visitFilterBy(opts)
1156
1157	// Determine the effective start: max(origin, previousMonthStart)
1158	effectiveStart := previousMonthStart
1159	if opts.Origin.After(previousMonthStart) {
1160		effectiveStart = opts.Origin
1161	}
1162
1163	topUrls := fmt.Sprintf(`
1164		SELECT
1165			path,
1166			count(DISTINCT ip_address) as path_count
1167		FROM analytics_visits
1168		WHERE created_at >= $1 AND created_at < $2 AND %s = $3 AND user_id = $4 AND path <> '' AND status <> 404
1169		GROUP BY path
1170		ORDER BY path_count DESC
1171		LIMIT 10`, where)
1172
1173	rows, err := me.Db.Queryx(topUrls, effectiveStart, currentMonthStart.AddDate(0, 1, 0), with, opts.UserID)
1174	if err != nil {
1175		return nil, err
1176	}
1177	defer func() { _ = rows.Close() }()
1178
1179	var results []*db.VisitUrl
1180	for rows.Next() {
1181		result := &db.VisitUrl{}
1182		if err := rows.Scan(&result.Url, &result.Count); err != nil {
1183			return nil, err
1184		}
1185		results = append(results, result)
1186	}
1187	return results, rows.Err()
1188}
1189
1190func (me *PsqlDB) VisitUrlNotFound(opts *db.SummaryOpts) ([]*db.VisitUrl, error) {
1191	limit := opts.Limit
1192	if limit == 0 {
1193		limit = 10
1194	}
1195
1196	var historical, current []*db.VisitUrl
1197	var histErr, rawErr error
1198
1199	var wg sync.WaitGroup
1200	wg.Add(2)
1201
1202	go func() {
1203		defer wg.Done()
1204		historical, histErr = me.visitUrlNotFoundFromSummary(opts, limit)
1205	}()
1206	go func() {
1207		defer wg.Done()
1208		current, rawErr = me.visitUrlNotFoundFromRaw(opts, limit)
1209	}()
1210
1211	wg.Wait()
1212
1213	if histErr != nil {
1214		return nil, fmt.Errorf("query summary 404 urls: %w", histErr)
1215	}
1216	if rawErr != nil {
1217		return nil, fmt.Errorf("query raw 404 urls: %w", rawErr)
1218	}
1219
1220	return mergeTopUrls(historical, current), nil
1221}
1222
1223// visitUrlNotFoundFromSummary reads top 404 URLs from analytics_monthly_top_urls for historical data.
1224func (me *PsqlDB) visitUrlNotFoundFromSummary(opts *db.SummaryOpts, limit int) ([]*db.VisitUrl, error) {
1225	now := time.Now()
1226	currentMonthStart := time.Date(now.Year(), now.Month(), 1, 0, 0, 0, 0, time.UTC)
1227	previousMonthStart := currentMonthStart.AddDate(0, -1, 0)
1228
1229	// If origin is in the previous month or later, raw data covers it — no summary to fetch.
1230	if !opts.Origin.Before(previousMonthStart) {
1231		return nil, nil
1232	}
1233
1234	// Clamp origin to month boundary for summary table lookup
1235	originMonthStart := time.Date(opts.Origin.Year(), opts.Origin.Month(), 1, 0, 0, 0, 0, time.UTC)
1236
1237	where := ""
1238	args := []interface{}{opts.UserID, originMonthStart, currentMonthStart}
1239	argIdx := 4
1240	if opts.Host != "" {
1241		where = "AND host = $" + fmt.Sprintf("%d", argIdx)
1242		args = append(args, opts.Host)
1243	}
1244
1245	query := fmt.Sprintf(`
1246		SELECT path, sum(unique_visits) as total_visits
1247		FROM analytics_monthly_top_urls
1248		WHERE user_id = $1 AND month >= $2 AND month < $3 AND status_code = 404 %s
1249		GROUP BY path
1250		ORDER BY total_visits DESC
1251		LIMIT %d`, where, limit)
1252
1253	rows, err := me.Db.Queryx(query, args...)
1254	if err != nil {
1255		return nil, err
1256	}
1257	defer func() { _ = rows.Close() }()
1258
1259	var results []*db.VisitUrl
1260	for rows.Next() {
1261		result := &db.VisitUrl{}
1262		if err := rows.Scan(&result.Url, &result.Count); err != nil {
1263			return nil, err
1264		}
1265		results = append(results, result)
1266	}
1267	return results, rows.Err()
1268}
1269
1270// visitUrlNotFoundFromRaw reads top 404 URLs from analytics_visits for the previous and current months.
1271func (me *PsqlDB) visitUrlNotFoundFromRaw(opts *db.SummaryOpts, limit int) ([]*db.VisitUrl, error) {
1272	now := time.Now()
1273	currentMonthStart := time.Date(now.Year(), now.Month(), 1, 0, 0, 0, 0, time.UTC)
1274	previousMonthStart := currentMonthStart.AddDate(0, -1, 0)
1275
1276	where, with := visitFilterBy(opts)
1277
1278	// Determine the effective start: max(origin, previousMonthStart)
1279	effectiveStart := previousMonthStart
1280	if opts.Origin.After(previousMonthStart) {
1281		effectiveStart = opts.Origin
1282	}
1283
1284	topUrls := fmt.Sprintf(`
1285		SELECT
1286			path,
1287			count(DISTINCT ip_address) as path_count
1288		FROM analytics_visits
1289		WHERE created_at >= $1 AND created_at < $2 AND %s = $3 AND user_id = $4 AND path <> '' AND status = 404
1290		GROUP BY path
1291		ORDER BY path_count DESC
1292		LIMIT %d`, where, limit)
1293
1294	rows, err := me.Db.Queryx(topUrls, effectiveStart, currentMonthStart.AddDate(0, 1, 0), with, opts.UserID)
1295	if err != nil {
1296		return nil, err
1297	}
1298	defer func() { _ = rows.Close() }()
1299
1300	var results []*db.VisitUrl
1301	for rows.Next() {
1302		result := &db.VisitUrl{}
1303		if err := rows.Scan(&result.Url, &result.Count); err != nil {
1304			return nil, err
1305		}
1306		results = append(results, result)
1307	}
1308	return results, rows.Err()
1309}
1310
1311func (me *PsqlDB) visitHost(opts *db.SummaryOpts) ([]*db.VisitUrl, error) {
1312	var historical, current []*db.VisitUrl
1313	var histErr, rawErr error
1314
1315	var wg sync.WaitGroup
1316	wg.Add(2)
1317
1318	go func() {
1319		defer wg.Done()
1320		historical, histErr = me.visitHostFromSummary(opts)
1321	}()
1322	go func() {
1323		defer wg.Done()
1324		current, rawErr = me.visitHostFromRaw(opts)
1325	}()
1326
1327	wg.Wait()
1328
1329	if histErr != nil {
1330		return nil, fmt.Errorf("query summary hosts: %w", histErr)
1331	}
1332	if rawErr != nil {
1333		return nil, fmt.Errorf("query raw hosts: %w", rawErr)
1334	}
1335
1336	return mergeHosts(historical, current), nil
1337}
1338
1339// visitHostFromSummary reads host data from analytics_user_sites for historical data.
1340func (me *PsqlDB) visitHostFromSummary(opts *db.SummaryOpts) ([]*db.VisitUrl, error) {
1341	rows, err := me.Db.Queryx(`
1342		SELECT host, total_visits
1343		FROM analytics_user_sites
1344		WHERE user_id = $1 AND host <> ''
1345		ORDER BY total_visits DESC`, opts.UserID)
1346	if err != nil {
1347		return nil, err
1348	}
1349	defer func() { _ = rows.Close() }()
1350
1351	var results []*db.VisitUrl
1352	for rows.Next() {
1353		result := &db.VisitUrl{}
1354		if err := rows.Scan(&result.Url, &result.Count); err != nil {
1355			return nil, err
1356		}
1357		results = append(results, result)
1358	}
1359	return results, rows.Err()
1360}
1361
1362// visitHostFromRaw reads hosts from analytics_visits for the current month that aren't in summary.
1363func (me *PsqlDB) visitHostFromRaw(opts *db.SummaryOpts) ([]*db.VisitUrl, error) {
1364	now := time.Now()
1365	currentMonthStart := time.Date(now.Year(), now.Month(), 1, 0, 0, 0, 0, time.UTC)
1366
1367	rows, err := me.Db.Queryx(`
1368		SELECT host, count(DISTINCT ip_address) as host_count
1369		FROM analytics_visits
1370		WHERE created_at >= $1 AND user_id = $2 AND host <> ''
1371		GROUP BY host
1372		ORDER BY host_count DESC`, currentMonthStart, opts.UserID)
1373	if err != nil {
1374		return nil, err
1375	}
1376	defer func() { _ = rows.Close() }()
1377
1378	var results []*db.VisitUrl
1379	for rows.Next() {
1380		result := &db.VisitUrl{}
1381		if err := rows.Scan(&result.Url, &result.Count); err != nil {
1382			return nil, err
1383		}
1384		results = append(results, result)
1385	}
1386	return results, rows.Err()
1387}
1388
1389func (me *PsqlDB) VisitSummary(opts *db.SummaryOpts) (*db.SummaryVisits, error) {
1390	var (
1391		visitors    []*db.VisitInterval
1392		urls        []*db.VisitUrl
1393		refs        []*db.VisitUrl
1394		notFound    []*db.VisitUrl
1395		visitorsErr error
1396		urlsErr     error
1397		refsErr     error
1398		nfErr       error
1399	)
1400
1401	var wg sync.WaitGroup
1402	wg.Add(4)
1403
1404	go func() {
1405		defer wg.Done()
1406		visitors, visitorsErr = me.visitUnique(opts)
1407	}()
1408	go func() {
1409		defer wg.Done()
1410		urls, urlsErr = me.visitUrl(opts)
1411	}()
1412	go func() {
1413		defer wg.Done()
1414		refs, refsErr = me.visitReferer(opts)
1415	}()
1416	go func() {
1417		defer wg.Done()
1418		notFound, nfErr = me.VisitUrlNotFound(opts)
1419	}()
1420
1421	wg.Wait()
1422
1423	// Return the first error encountered
1424	for _, err := range []error{visitorsErr, urlsErr, refsErr, nfErr} {
1425		if err != nil {
1426			return nil, err
1427		}
1428	}
1429
1430	return &db.SummaryVisits{
1431		Intervals:    visitors,
1432		TopUrls:      urls,
1433		TopReferers:  refs,
1434		NotFoundUrls: notFound,
1435	}, nil
1436}
1437
1438func (me *PsqlDB) FindVisitSiteList(opts *db.SummaryOpts) ([]*db.VisitUrl, error) {
1439	return me.visitHost(opts)
1440}
1441
1442func (me *PsqlDB) FindUsers() ([]*db.User, error) {
1443	var users []*db.User
1444	err := me.Db.Select(&users, `SELECT id, COALESCE(name, '') as name, created_at FROM app_users ORDER BY name ASC`)
1445	if err != nil {
1446		return nil, err
1447	}
1448	return users, nil
1449}
1450
1451func (me *PsqlDB) removeTagsByPost(tx *sqlx.Tx, postID string) error {
1452	_, err := tx.Exec(`DELETE FROM post_tags WHERE post_id = $1`, postID)
1453	return err
1454}
1455
1456func (me *PsqlDB) insertTagsByPost(tx *sqlx.Tx, tags []string, postID string) ([]string, error) {
1457	ids := make([]string, 0)
1458	for _, tag := range tags {
1459		id := ""
1460		err := tx.QueryRow(`INSERT INTO post_tags (post_id, name) VALUES($1, $2) RETURNING id;`, postID, tag).Scan(&id)
1461		if err != nil {
1462			return nil, err
1463		}
1464		ids = append(ids, id)
1465	}
1466
1467	return ids, nil
1468}
1469
1470func (me *PsqlDB) ReplaceTagsByPost(tags []string, postID string) error {
1471	tx, err := me.Db.Beginx()
1472	if err != nil {
1473		return err
1474	}
1475	defer func() {
1476		_ = tx.Rollback()
1477	}()
1478
1479	err = me.removeTagsByPost(tx, postID)
1480	if err != nil {
1481		return err
1482	}
1483
1484	_, err = me.insertTagsByPost(tx, tags, postID)
1485	if err != nil {
1486		return err
1487	}
1488
1489	err = tx.Commit()
1490	return err
1491}
1492
1493func (me *PsqlDB) removeAliasesByPost(tx *sqlx.Tx, postID string) error {
1494	_, err := tx.Exec(`DELETE FROM post_aliases WHERE post_id = $1`, postID)
1495	return err
1496}
1497
1498func (me *PsqlDB) insertAliasesByPost(tx *sqlx.Tx, aliases []string, postID string) ([]string, error) {
1499	// hardcoded
1500	denyList := []string{
1501		"rss",
1502		"rss.xml",
1503		"rss.atom",
1504		"atom.xml",
1505		"feed.xml",
1506		"smol.css",
1507		"main.css",
1508		"syntax.css",
1509		"card.png",
1510		"favicon-16x16.png",
1511		"favicon-32x32.png",
1512		"apple-touch-icon.png",
1513		"favicon.ico",
1514		"robots.txt",
1515		"atom",
1516		"blog/index.xml",
1517	}
1518
1519	ids := make([]string, 0)
1520	for _, alias := range aliases {
1521		if slices.Contains(denyList, alias) {
1522			me.Logger.Info(
1523				"name is in the deny list for aliases because it conflicts with a static route, skipping",
1524				"alias", alias,
1525			)
1526			continue
1527		}
1528		id := ""
1529		err := tx.QueryRow(`INSERT INTO post_aliases (post_id, slug) VALUES($1, $2) RETURNING id;`, postID, alias).Scan(&id)
1530		if err != nil {
1531			return nil, err
1532		}
1533		ids = append(ids, id)
1534	}
1535
1536	return ids, nil
1537}
1538
1539func (me *PsqlDB) ReplaceAliasesByPost(aliases []string, postID string) error {
1540	tx, err := me.Db.Beginx()
1541	if err != nil {
1542		return err
1543	}
1544	defer func() {
1545		_ = tx.Rollback()
1546	}()
1547
1548	err = me.removeAliasesByPost(tx, postID)
1549	if err != nil {
1550		return err
1551	}
1552
1553	_, err = me.insertAliasesByPost(tx, aliases, postID)
1554	if err != nil {
1555		return err
1556	}
1557
1558	err = tx.Commit()
1559	return err
1560}
1561
1562func (me *PsqlDB) FindUserPostsByTag(page *db.Pager, tag, userID, space string) (*db.Paginate[*db.Post], error) {
1563	var posts []*db.Post
1564	query := fmt.Sprintf(`
1565	SELECT %s
1566	FROM posts
1567	LEFT JOIN app_users ON app_users.id = posts.user_id
1568	LEFT JOIN post_tags ON post_tags.post_id = posts.id
1569	WHERE
1570		hidden = FALSE AND
1571		user_id = $1 AND
1572		(post_tags.name = $2 OR hidden = true) AND
1573		publish_at::date <= CURRENT_DATE AND
1574		cur_space = $3
1575	ORDER BY publish_at DESC
1576	LIMIT $4 OFFSET $5`, SelectPost)
1577	err := me.Db.Select(
1578		&posts,
1579		query,
1580		userID,
1581		tag,
1582		space,
1583		page.Num,
1584		page.Num*page.Page,
1585	)
1586	if err != nil {
1587		return nil, err
1588	}
1589
1590	var count int
1591	err = me.Db.QueryRow(`SELECT count(id) FROM posts WHERE hidden = FALSE AND cur_space=$1`, space).Scan(&count)
1592	if err != nil {
1593		return nil, err
1594	}
1595
1596	pager := &db.Paginate[*db.Post]{
1597		Data:  posts,
1598		Total: int(math.Ceil(float64(count) / float64(page.Num))),
1599	}
1600	return pager, nil
1601}
1602
1603func (me *PsqlDB) FindPostsByTag(pager *db.Pager, tag, space string) (*db.Paginate[*db.Post], error) {
1604	query := `
1605	SELECT
1606		posts.id,
1607		user_id,
1608		filename,
1609		slug,
1610		title,
1611		text,
1612		description,
1613		publish_at,
1614		app_users.name as username,
1615		posts.updated_at,
1616		posts.mime_type
1617	FROM posts
1618	LEFT JOIN app_users ON app_users.id = posts.user_id
1619	LEFT JOIN post_tags ON post_tags.post_id = posts.id
1620	WHERE
1621		post_tags.name = $3 AND
1622		publish_at::date <= CURRENT_DATE AND
1623		cur_space = $4
1624	ORDER BY publish_at DESC
1625	LIMIT $1 OFFSET $2`
1626	rs, err := me.Db.Queryx(
1627		query,
1628		pager.Num,
1629		pager.Num*pager.Page,
1630		tag,
1631		space,
1632	)
1633	if err != nil {
1634		return nil, err
1635	}
1636	defer func() { _ = rs.Close() }()
1637
1638	return me.postPager(rs, pager.Num, space, tag)
1639}
1640
1641func (me *PsqlDB) FindPopularTags(space string) ([]string, error) {
1642	tags := make([]string, 0)
1643	query := `
1644	SELECT name, count(post_id) as "tally"
1645	FROM post_tags
1646	LEFT JOIN posts ON posts.id = post_id
1647	WHERE posts.cur_space = $1
1648	GROUP BY name
1649	ORDER BY tally DESC
1650	LIMIT 5`
1651	rs, err := me.Db.Queryx(query, space)
1652	if err != nil {
1653		return tags, err
1654	}
1655	defer func() { _ = rs.Close() }()
1656	for rs.Next() {
1657		name := ""
1658		tally := 0
1659		err := rs.Scan(&name, &tally)
1660		if err != nil {
1661			return tags, err
1662		}
1663
1664		tags = append(tags, name)
1665	}
1666	if rs.Err() != nil {
1667		return tags, rs.Err()
1668	}
1669	return tags, nil
1670}
1671
1672func (me *PsqlDB) FindFeature(userID string, feature string) (*db.FeatureFlag, error) {
1673	ff := &db.FeatureFlag{}
1674	err := me.Db.Get(ff, `SELECT * FROM feature_flags WHERE user_id = $1 AND name = $2 ORDER BY expires_at DESC LIMIT 1`, userID, feature)
1675	if err != nil {
1676		return nil, err
1677	}
1678	return ff, nil
1679}
1680
1681func (me *PsqlDB) FindFeaturesByUser(userID string) ([]*db.FeatureFlag, error) {
1682	var features []*db.FeatureFlag
1683	// https://stackoverflow.com/a/16920077
1684	query := `SELECT DISTINCT ON (name) *
1685		FROM feature_flags
1686		WHERE user_id=$1
1687		ORDER BY name, expires_at DESC;`
1688	err := me.Db.Select(&features, query, userID)
1689	if err != nil {
1690		return nil, err
1691	}
1692	return features, nil
1693}
1694
1695func (me *PsqlDB) HasFeatureByUser(userID string, feature string) bool {
1696	ff, err := me.FindFeature(userID, feature)
1697	if err != nil {
1698		return false
1699	}
1700	return ff.IsValid()
1701}
1702
1703func (me *PsqlDB) InsertFeedItems(postID string, items []*db.FeedItem) error {
1704	tx, err := me.Db.Beginx()
1705	if err != nil {
1706		return err
1707	}
1708	defer func() {
1709		_ = tx.Rollback()
1710	}()
1711
1712	for _, item := range items {
1713		_, err := tx.Exec(
1714			`INSERT INTO feed_items (post_id, guid, data) VALUES ($1, $2, $3) RETURNING id;`,
1715			item.PostID,
1716			item.GUID,
1717			item.Data,
1718		)
1719		if err != nil {
1720			return fmt.Errorf(
1721				"post id:%s, link:%s, guid:%s, err:%w",
1722				item.PostID, item.Data.Link, item.GUID, err,
1723			)
1724		}
1725	}
1726
1727	err = tx.Commit()
1728	return err
1729}
1730
1731func (me *PsqlDB) FindFeedItemsByPostID(postID string) ([]*db.FeedItem, error) {
1732	var items []*db.FeedItem
1733	err := me.Db.Select(&items, `SELECT * FROM feed_items WHERE post_id=$1`, postID)
1734	if err != nil {
1735		return nil, err
1736	}
1737	return items, nil
1738}
1739
1740func (me *PsqlDB) InsertProject(userID, name, projectDir string) (string, error) {
1741	if !shared.IsValidSubdomain(name) {
1742		return "", fmt.Errorf("'%s' is not a valid project name, must match /^[a-z0-9-]+$/", name)
1743	}
1744
1745	var id string
1746	err := me.Db.QueryRow(`INSERT INTO projects (user_id, name, project_dir) VALUES ($1, $2, $3) RETURNING id;`, userID, name, projectDir).Scan(&id)
1747	if err != nil {
1748		return "", err
1749	}
1750	return id, nil
1751}
1752
1753func (me *PsqlDB) UpdateProject(userID, name string) error {
1754	_, err := me.Db.Exec(`UPDATE projects SET updated_at = $3 WHERE user_id = $1 AND name = $2;`, userID, name, time.Now())
1755	return err
1756}
1757
1758func (me *PsqlDB) FindProjectByName(userID, name string) (*db.Project, error) {
1759	project := &db.Project{}
1760	err := me.Db.Get(project, `SELECT * FROM projects WHERE user_id = $1 AND name = $2;`, userID, name)
1761	if err != nil {
1762		return nil, err
1763	}
1764	return project, nil
1765}
1766
1767func (me *PsqlDB) InsertToken(userID, name string) (string, error) {
1768	var token string
1769	err := me.Db.QueryRow(`INSERT INTO tokens (user_id, name) VALUES($1, $2) RETURNING token;`, userID, name).Scan(&token)
1770	if err != nil {
1771		return "", err
1772	}
1773	return token, nil
1774}
1775
1776func (me *PsqlDB) UpsertToken(userID, name string) (string, error) {
1777	token, _ := me.findTokenByName(userID, name)
1778	if token != "" {
1779		return token, nil
1780	}
1781
1782	token, err := me.InsertToken(userID, name)
1783	return token, err
1784}
1785
1786func (me *PsqlDB) findTokenByName(userID, name string) (string, error) {
1787	var token string
1788	err := me.Db.QueryRow(`SELECT token FROM tokens WHERE user_id = $1 AND name = $2`, userID, name).Scan(&token)
1789	if err != nil {
1790		return "", err
1791	}
1792	return token, nil
1793}
1794
1795func (me *PsqlDB) RemoveToken(tokenID string) error {
1796	_, err := me.Db.Exec(`DELETE FROM tokens WHERE id = $1`, tokenID)
1797	return err
1798}
1799
1800func (me *PsqlDB) FindTokensByUser(userID string) ([]*db.Token, error) {
1801	var tokens []*db.Token
1802	err := me.Db.Select(&tokens, `SELECT * FROM tokens WHERE user_id = $1`, userID)
1803	if err != nil {
1804		return nil, err
1805	}
1806	return tokens, nil
1807}
1808
1809func (me *PsqlDB) InsertFeature(userID, name string, expiresAt time.Time) (*db.FeatureFlag, error) {
1810	var featureID string
1811	err := me.Db.QueryRow(
1812		`INSERT INTO feature_flags (user_id, name, expires_at) VALUES ($1, $2, $3) RETURNING id;`,
1813		userID,
1814		name,
1815		expiresAt,
1816	).Scan(&featureID)
1817	if err != nil {
1818		return nil, err
1819	}
1820
1821	feature, err := me.FindFeature(userID, name)
1822	if err != nil {
1823		return nil, err
1824	}
1825
1826	return feature, nil
1827}
1828
1829func (me *PsqlDB) RemoveFeature(userID string, name string) error {
1830	_, err := me.Db.Exec(`DELETE FROM feature_flags WHERE user_id = $1 AND name = $2`, userID, name)
1831	return err
1832}
1833
1834func (me *PsqlDB) createFeatureExpiresAt(userID, name string) time.Time {
1835	ff, _ := me.FindFeature(userID, name)
1836	// if the feature flag has already expired we don't want to add a year to it since that will
1837	// not grant the user a full year
1838	if ff == nil || !ff.IsValid() {
1839		t := time.Now()
1840		return t.AddDate(1, 0, 0)
1841	}
1842	return ff.ExpiresAt.AddDate(1, 0, 0)
1843}
1844
1845func (me *PsqlDB) AddPicoPlusUser(username, email, paymentType, txId string) error {
1846	user, err := me.FindUserByName(username)
1847	if err != nil {
1848		return err
1849	}
1850
1851	tx, err := me.Db.Beginx()
1852	if err != nil {
1853		return err
1854	}
1855	defer func() {
1856		_ = tx.Rollback()
1857	}()
1858
1859	var paymentHistoryId sql.NullString
1860	if paymentType != "" {
1861		data := db.PaymentHistoryData{
1862			Notes: "",
1863			TxID:  txId,
1864		}
1865
1866		err := tx.QueryRow(
1867			`INSERT INTO payment_history (user_id, payment_type, amount, data) VALUES ($1, $2, 24 * 1000000, $3) RETURNING id;`,
1868			user.ID,
1869			paymentType,
1870			data,
1871		).Scan(&paymentHistoryId)
1872		if err != nil {
1873			return err
1874		}
1875	}
1876
1877	plus := me.createFeatureExpiresAt(user.ID, "plus")
1878	plusQuery := fmt.Sprintf(`INSERT INTO feature_flags (user_id, name, data, expires_at, payment_history_id)
1879	VALUES ($1, 'plus', '{"storage_max":10000000000, "file_max":100000000, "email": "%s"}'::jsonb, $2, $3);`, email)
1880	_, err = tx.Exec(plusQuery, user.ID, plus, paymentHistoryId)
1881	if err != nil {
1882		return err
1883	}
1884
1885	return tx.Commit()
1886}
1887
1888func (me *PsqlDB) UpsertProject(userID, projectName, projectDir string) (*db.Project, error) {
1889	project, err := me.FindProjectByName(userID, projectName)
1890	if err == nil {
1891		// this just updates the `createdAt` timestamp, useful for book-keeping
1892		err = me.UpdateProject(userID, projectName)
1893		if err != nil {
1894			me.Logger.Error("could not update project", "err", err)
1895			return nil, err
1896		}
1897		return project, nil
1898	}
1899
1900	_, err = me.InsertProject(userID, projectName, projectName)
1901	if err != nil {
1902		me.Logger.Error("could not create project", "err", err)
1903		return nil, err
1904	}
1905	return me.FindProjectByName(userID, projectName)
1906}
1907
1908func (me *PsqlDB) findPagesStats(userID string) (*db.UserServiceStats, error) {
1909	stats := db.UserServiceStats{
1910		Service: "pgs",
1911	}
1912	err := me.Db.QueryRow(
1913		`SELECT count(id), min(created_at), max(created_at), max(updated_at) FROM projects WHERE user_id=$1`,
1914		userID,
1915	).Scan(&stats.Num, &stats.FirstCreatedAt, &stats.LastestCreatedAt, &stats.LatestUpdatedAt)
1916	if err != nil {
1917		return nil, err
1918	}
1919
1920	return &stats, nil
1921}
1922
1923func (me *PsqlDB) InsertTunsEventLog(log *db.TunsEventLog) error {
1924	_, err := me.Db.Exec(
1925		`INSERT INTO tuns_event_logs
1926			(user_id, server_id, remote_addr, event_type, tunnel_type, connection_type, tunnel_id)
1927		VALUES
1928			($1, $2, $3, $4, $5, $6, $7)`,
1929		log.UserId, log.ServerID, log.RemoteAddr, log.EventType, log.TunnelType,
1930		log.ConnectionType, log.TunnelID,
1931	)
1932	return err
1933}
1934
1935func (me *PsqlDB) FindTunsEventLogsByAddr(userID, addr string) ([]*db.TunsEventLog, error) {
1936	var logs []*db.TunsEventLog
1937	err := me.Db.Select(&logs,
1938		`SELECT * FROM tuns_event_logs WHERE user_id=$1 AND tunnel_id=$2 ORDER BY created_at DESC`, userID, addr)
1939	if err != nil {
1940		return nil, err
1941	}
1942	return logs, nil
1943}
1944
1945func (me *PsqlDB) FindTunsEventLogs(userID string) ([]*db.TunsEventLog, error) {
1946	var logs []*db.TunsEventLog
1947	err := me.Db.Select(&logs,
1948		`SELECT * FROM tuns_event_logs WHERE user_id=$1 ORDER BY created_at DESC`, userID)
1949	if err != nil {
1950		return nil, err
1951	}
1952	return logs, nil
1953}
1954
1955func (me *PsqlDB) FindUserStats(userID string) (*db.UserStats, error) {
1956	stats := db.UserStats{}
1957	rs, err := me.Db.Queryx(`SELECT cur_space, count(id), min(created_at), max(created_at), max(updated_at) FROM posts WHERE user_id=$1 GROUP BY cur_space`, userID)
1958	if err != nil {
1959		return nil, err
1960	}
1961	defer func() { _ = rs.Close() }()
1962
1963	for rs.Next() {
1964		stat := db.UserServiceStats{}
1965		err := rs.Scan(&stat.Service, &stat.Num, &stat.FirstCreatedAt, &stat.LastestCreatedAt, &stat.LatestUpdatedAt)
1966		if err != nil {
1967			return nil, err
1968		}
1969		switch stat.Service {
1970		case "prose":
1971			stats.Prose = stat
1972		case "pastes":
1973			stats.Pastes = stat
1974		case "feeds":
1975			stats.Feeds = stat
1976		}
1977	}
1978
1979	if rs.Err() != nil {
1980		return nil, rs.Err()
1981	}
1982
1983	pgs, err := me.findPagesStats(userID)
1984	if err != nil {
1985		return nil, err
1986	}
1987	stats.Pages = *pgs
1988	return &stats, err
1989}
1990
1991func (me *PsqlDB) FindAccessLogs(userID string, fromDate *time.Time) ([]*db.AccessLog, error) {
1992	var logs []*db.AccessLog
1993	err := me.Db.Select(&logs, `SELECT * FROM access_logs WHERE user_id=$1 AND created_at >= $2 ORDER BY created_at ASC`, userID, fromDate)
1994	if err != nil {
1995		return nil, err
1996	}
1997	return logs, nil
1998}
1999
2000func (me *PsqlDB) FindAccessLogsByPubkey(pubkey string, fromDate *time.Time) ([]*db.AccessLog, error) {
2001	var logs []*db.AccessLog
2002	err := me.Db.Select(&logs, `SELECT * FROM access_logs WHERE pubkey=$1 AND created_at >= $2 ORDER BY created_at ASC`, pubkey, fromDate)
2003	if err != nil {
2004		return nil, err
2005	}
2006	return logs, nil
2007}
2008
2009func (me *PsqlDB) FindPubkeysInAccessLogs(userID string) ([]string, error) {
2010	var pubkeys []string
2011	err := me.Db.Select(&pubkeys, `SELECT DISTINCT(pubkey) FROM access_logs WHERE user_id=$1`, userID)
2012	if err != nil {
2013		return nil, err
2014	}
2015	return pubkeys, nil
2016}
2017
2018func (me *PsqlDB) InsertAccessLog(log *db.AccessLog) error {
2019	_, err := me.Db.Exec(
2020		`INSERT INTO access_logs (user_id, service, pubkey, identity) VALUES ($1, $2, $3, $4);`,
2021		log.UserID,
2022		log.Service,
2023		log.Pubkey,
2024		log.Identity,
2025	)
2026	return err
2027}
2028
2029func (me *PsqlDB) UpsertPipeMonitor(userID, topic string, dur time.Duration, winEnd *time.Time) error {
2030	durStr := fmt.Sprintf("%d seconds", int64(dur.Seconds()))
2031	_, err := me.Db.Exec(
2032		`INSERT INTO pipe_monitors (user_id, topic, window_dur, window_end)
2033		VALUES ($1, $2, $3::interval, $4)
2034		ON CONFLICT (user_id, topic) DO UPDATE SET window_dur = $3::interval, window_end = $4, updated_at = NOW();`,
2035		userID,
2036		topic,
2037		durStr,
2038		winEnd,
2039	)
2040	return err
2041}
2042
2043func (me *PsqlDB) UpdatePipeMonitorLastPing(userID, topic string, lastPing *time.Time) error {
2044	_, err := me.Db.Exec(
2045		`UPDATE pipe_monitors SET last_ping = $3, updated_at = NOW() WHERE user_id = $1 AND topic = $2;`,
2046		userID,
2047		topic,
2048		lastPing,
2049	)
2050	return err
2051}
2052
2053func (me *PsqlDB) RemovePipeMonitor(userID, topic string) error {
2054	_, err := me.Db.Exec(
2055		`DELETE FROM pipe_monitors WHERE user_id = $1 AND topic = $2;`,
2056		userID,
2057		topic,
2058	)
2059	return err
2060}
2061
2062func (me *PsqlDB) FindPipeMonitorByTopic(userID, topic string) (*db.PipeMonitor, error) {
2063	monitor := &db.PipeMonitor{}
2064	err := me.Db.Get(monitor, `SELECT id, user_id, topic, (EXTRACT(EPOCH FROM window_dur) * 1000000000)::bigint as window_dur, window_end, last_ping, created_at, updated_at FROM pipe_monitors WHERE user_id = $1 AND topic = $2;`, userID, topic)
2065	if err != nil {
2066		return nil, err
2067	}
2068	return monitor, nil
2069}
2070
2071func (me *PsqlDB) FindPipeMonitorsByUser(userID string) ([]*db.PipeMonitor, error) {
2072	var monitors []*db.PipeMonitor
2073	err := me.Db.Select(&monitors, `SELECT id, user_id, topic, (EXTRACT(EPOCH FROM window_dur) * 1000000000)::bigint as window_dur, window_end, last_ping, created_at, updated_at FROM pipe_monitors WHERE user_id = $1 ORDER BY topic;`, userID)
2074	if err != nil {
2075		return nil, err
2076	}
2077	return monitors, nil
2078}
2079
2080func (me *PsqlDB) InsertPipeMonitorHistory(monitorID string, windowDur time.Duration, windowEnd, lastPing *time.Time) error {
2081	durStr := fmt.Sprintf("%d seconds", int64(windowDur.Seconds()))
2082	_, err := me.Db.Exec(
2083		`INSERT INTO pipe_monitors_history (monitor_id, window_dur, window_end, last_ping) VALUES ($1, $2::interval, $3, $4)`,
2084		monitorID, durStr, windowEnd, lastPing,
2085	)
2086	return err
2087}
2088
2089func (me *PsqlDB) FindPipeMonitorHistory(monitorID string, from, to time.Time) ([]*db.PipeMonitorHistory, error) {
2090	var history []*db.PipeMonitorHistory
2091	err := me.Db.Select(
2092		&history,
2093		`SELECT id, monitor_id, (EXTRACT(EPOCH FROM window_dur) * 1000000000)::bigint as window_dur, window_end, last_ping, created_at, updated_at FROM pipe_monitors_history WHERE monitor_id = $1 AND last_ping <= $2 AND window_end >= $3 ORDER BY last_ping ASC`,
2094		monitorID, to, from,
2095	)
2096	if err != nil {
2097		return nil, err
2098	}
2099	return history, nil
2100}