Eric Bower
·
2026-01-25
storage.go
1package postgres
2
3import (
4 "database/sql"
5 "errors"
6 "fmt"
7 "log/slog"
8 "math"
9 "strings"
10 "time"
11
12 "slices"
13
14 "github.com/jmoiron/sqlx"
15 _ "github.com/lib/pq"
16 "github.com/picosh/pico/pkg/db"
17 "github.com/picosh/pico/pkg/shared"
18)
19
20var PAGER_SIZE = 15
21
22var SelectPost = `
23 posts.id, user_id, app_users.name, filename, slug, title, text, description,
24 posts.created_at, publish_at, posts.updated_at, hidden, file_size, mime_type, shasum, data, expires_at, views`
25
26type PsqlDB struct {
27 Logger *slog.Logger
28 Db *sqlx.DB
29}
30
31type RowScanner interface {
32 Scan(dest ...any) error
33}
34
35func CreatePostWithTagsByRow(r RowScanner) (*db.Post, error) {
36 post := &db.Post{}
37 tagStr := ""
38 err := r.Scan(
39 &post.ID,
40 &post.UserID,
41 &post.Username,
42 &post.Filename,
43 &post.Slug,
44 &post.Title,
45 &post.Text,
46 &post.Description,
47 &post.CreatedAt,
48 &post.PublishAt,
49 &post.UpdatedAt,
50 &post.Hidden,
51 &post.FileSize,
52 &post.MimeType,
53 &post.Shasum,
54 &post.Data,
55 &post.ExpiresAt,
56 &post.Views,
57 &tagStr,
58 )
59 if err != nil {
60 return nil, err
61 }
62
63 tags := strings.Split(tagStr, ",")
64 for _, tag := range tags {
65 tg := strings.TrimSpace(tag)
66 if tg == "" {
67 continue
68 }
69 post.Tags = append(post.Tags, tg)
70 }
71
72 return post, nil
73}
74
75func NewDB(databaseUrl string, logger *slog.Logger) *PsqlDB {
76 var err error
77 d := &PsqlDB{
78 Logger: logger,
79 }
80 d.Logger.Info("Connecting to postgres", "databaseUrl", databaseUrl)
81
82 db, err := sqlx.Connect("postgres", databaseUrl)
83 if err != nil {
84 d.Logger.Error(err.Error())
85 }
86 d.Db = db
87 return d
88}
89
90func (me *PsqlDB) RegisterUser(username, pubkey, comment string) (*db.User, error) {
91 lowerName := strings.ToLower(username)
92 valid, err := me.validateName(lowerName)
93 if !valid {
94 return nil, err
95 }
96
97 tx, err := me.Db.Beginx()
98 if err != nil {
99 return nil, err
100 }
101 defer func() {
102 _ = tx.Rollback()
103 }()
104
105 var id string
106 err = tx.QueryRow(`INSERT INTO app_users (name) VALUES($1) returning id`, lowerName).Scan(&id)
107 if err != nil {
108 return nil, err
109 }
110
111 err = me.insertPublicKeyWithTx(id, pubkey, comment, tx)
112 if err != nil {
113 return nil, err
114 }
115
116 err = tx.Commit()
117 if err != nil {
118 return nil, err
119 }
120
121 return me.FindUserByKey(username, pubkey)
122}
123
124func (me *PsqlDB) insertPublicKeyWithTx(userID, key, name string, tx *sqlx.Tx) error {
125 pk, _ := me.findPublicKeyByKey(key)
126 if pk != nil {
127 return db.ErrPublicKeyTaken
128 }
129 query := `INSERT INTO public_keys (user_id, public_key, name) VALUES ($1, $2, $3)`
130 _, err := tx.Exec(query, userID, key, name)
131 return err
132}
133
134func (me *PsqlDB) InsertPublicKey(userID, key, name string) error {
135 pk, _ := me.findPublicKeyByKey(key)
136 if pk != nil {
137 return db.ErrPublicKeyTaken
138 }
139 query := `INSERT INTO public_keys (user_id, public_key, name) VALUES ($1, $2, $3)`
140 _, err := me.Db.Exec(query, userID, key, name)
141 return err
142}
143
144func (me *PsqlDB) UpdatePublicKey(pubkeyID, name string) (*db.PublicKey, error) {
145 pk, err := me.findPublicKey(pubkeyID)
146 if err != nil {
147 return nil, err
148 }
149
150 query := `UPDATE public_keys SET name=$1 WHERE id=$2;`
151 _, err = me.Db.Exec(query, name, pk.ID)
152 if err != nil {
153 return nil, err
154 }
155
156 pk, err = me.findPublicKey(pubkeyID)
157 if err != nil {
158 return nil, err
159 }
160 return pk, nil
161}
162
163func (me *PsqlDB) findPublicKeyByKey(key string) (*db.PublicKey, error) {
164 var keys []*db.PublicKey
165 rs, err := me.Db.Queryx(`SELECT id, user_id, name, public_key, created_at FROM public_keys WHERE public_key = $1`, key)
166 if err != nil {
167 return nil, err
168 }
169 defer func() { _ = rs.Close() }()
170
171 for rs.Next() {
172 pk := &db.PublicKey{}
173 err := rs.Scan(&pk.ID, &pk.UserID, &pk.Name, &pk.Key, &pk.CreatedAt)
174 if err != nil {
175 return nil, err
176 }
177
178 keys = append(keys, pk)
179 }
180
181 if rs.Err() != nil {
182 return nil, rs.Err()
183 }
184
185 if len(keys) == 0 {
186 return nil, fmt.Errorf("pubkey not found in our database: [%s]", key)
187 }
188
189 // When we run PublicKeyByKey and there are multiple public keys returned from the database
190 // that should mean that we don't have the correct username for this public key.
191 // When that happens we need to reject the authentication and ask the user to provide the correct
192 // username when using ssh. So instead of `ssh <domain>` it should be `ssh user@<domain>`
193 if len(keys) > 1 {
194 return nil, &db.ErrMultiplePublicKeys{}
195 }
196
197 return keys[0], nil
198}
199
200func (me *PsqlDB) findPublicKey(pubkeyID string) (*db.PublicKey, error) {
201 pk := &db.PublicKey{}
202 err := me.Db.Get(pk, `SELECT * FROM public_keys WHERE id = $1`, pubkeyID)
203 if err != nil {
204 return nil, err
205 }
206 return pk, nil
207}
208
209func (me *PsqlDB) FindKeysByUser(user *db.User) ([]*db.PublicKey, error) {
210 var keys []*db.PublicKey
211 err := me.Db.Select(&keys, `SELECT * FROM public_keys WHERE user_id = $1 ORDER BY created_at ASC`, user.ID)
212 if err != nil {
213 return nil, err
214 }
215 return keys, nil
216}
217
218func (me *PsqlDB) RemoveKeys(keyIDs []string) error {
219 param := "{" + strings.Join(keyIDs, ",") + "}"
220 _, err := me.Db.Exec(`DELETE FROM public_keys WHERE id = ANY($1::uuid[])`, param)
221 return err
222}
223
224func (me *PsqlDB) FindUsersWithPost(space string) ([]*db.User, error) {
225 var users []*db.User
226 rs, err := me.Db.Queryx(
227 `SELECT u.id, u.name, u.created_at
228 FROM app_users u
229 INNER JOIN posts ON u.id=posts.user_id
230 WHERE cur_space='feeds'
231 GROUP BY u.id, u.name, u.created_at
232 ORDER BY name ASC`,
233 )
234 if err != nil {
235 return users, err
236 }
237 defer func() { _ = rs.Close() }()
238 for rs.Next() {
239 var name sql.NullString
240 user := &db.User{}
241 err := rs.Scan(
242 &user.ID,
243 &name,
244 &user.CreatedAt,
245 )
246 if err != nil {
247 return users, err
248 }
249 user.Name = name.String
250
251 users = append(users, user)
252 }
253 if rs.Err() != nil {
254 return users, rs.Err()
255 }
256 return users, nil
257}
258
259func (me *PsqlDB) FindUserByKey(username string, key string) (*db.User, error) {
260 me.Logger.Info("attempting to find user with only public key", "key", key)
261 pk, err := me.findPublicKeyByKey(key)
262 if err == nil {
263 me.Logger.Info("found pubkey, looking for user", "key", key, "userId", pk.UserID)
264 user, err := me.FindUser(pk.UserID)
265 if err != nil {
266 return nil, err
267 }
268 user.PublicKey = pk
269 return user, nil
270 }
271
272 if errors.Is(err, &db.ErrMultiplePublicKeys{}) {
273 me.Logger.Info("detected multiple users with same public key", "user", username)
274 user, err := me.findUserForNameAndKey(username, key)
275 if err != nil {
276 me.Logger.Info("could not find user by username and public key", "user", username, "key", key)
277 // this is a little hacky but if we cannot find a user by name and public key
278 // then we return the multiple keys detected error so the user knows to specify their
279 // when logging in
280 return nil, &db.ErrMultiplePublicKeys{}
281 }
282 return user, nil
283 }
284
285 return nil, err
286}
287
288func (me *PsqlDB) FindUserByPubkey(key string) (*db.User, error) {
289 me.Logger.Info("attempting to find user with only public key", "key", key)
290 pk, err := me.findPublicKeyByKey(key)
291 if err != nil {
292 return nil, err
293 }
294
295 me.Logger.Info("found pubkey, looking for user", "key", key, "userId", pk.UserID)
296 user, err := me.FindUser(pk.UserID)
297 if err != nil {
298 return nil, err
299 }
300 user.PublicKey = pk
301 return user, nil
302}
303
304func (me *PsqlDB) FindUser(userID string) (*db.User, error) {
305 user := &db.User{}
306 err := me.Db.Get(user, `SELECT id, COALESCE(name, '') as name, created_at FROM app_users WHERE id = $1`, userID)
307 if err != nil {
308 return nil, err
309 }
310 return user, nil
311}
312
313func (me *PsqlDB) validateName(name string) (bool, error) {
314 lower := strings.ToLower(name)
315 if slices.Contains(db.DenyList, lower) {
316 return false, fmt.Errorf("%s is on deny list: %w", lower, db.ErrNameDenied)
317 }
318 v := db.NameValidator.MatchString(lower)
319 if !v {
320 return false, fmt.Errorf("%s is invalid: %w", lower, db.ErrNameInvalid)
321 }
322 user, _ := me.FindUserByName(lower)
323 if user == nil {
324 return true, nil
325 }
326 return false, fmt.Errorf("%s already taken: %w", lower, db.ErrNameTaken)
327}
328
329func (me *PsqlDB) FindUserByName(name string) (*db.User, error) {
330 user := &db.User{}
331 err := me.Db.Get(user, `SELECT * FROM app_users WHERE name = $1`, strings.ToLower(name))
332 if err != nil {
333 return nil, err
334 }
335 return user, nil
336}
337
338func (me *PsqlDB) findUserForNameAndKey(name string, key string) (*db.User, error) {
339 user := &db.User{}
340 pk := &db.PublicKey{}
341
342 r := me.Db.QueryRow(`SELECT app_users.id, app_users.name, app_users.created_at, public_keys.id as pk_id, public_keys.public_key, public_keys.created_at as pk_created_at FROM app_users LEFT JOIN public_keys ON public_keys.user_id = app_users.id WHERE app_users.name = $1 AND public_keys.public_key = $2`, strings.ToLower(name), key)
343 err := r.Scan(&user.ID, &user.Name, &user.CreatedAt, &pk.ID, &pk.Key, &pk.CreatedAt)
344 if err != nil {
345 return nil, err
346 }
347
348 user.PublicKey = pk
349 return user, nil
350}
351
352func (me *PsqlDB) FindUserByToken(token string) (*db.User, error) {
353 user := &db.User{}
354 err := me.Db.Get(user, `
355 SELECT app_users.id, app_users.name, app_users.created_at
356 FROM app_users
357 LEFT JOIN tokens ON tokens.user_id = app_users.id
358 WHERE tokens.token = $1 AND tokens.expires_at > NOW()`, token)
359 if err != nil {
360 return nil, err
361 }
362 return user, nil
363}
364
365func (me *PsqlDB) FindPostWithFilename(filename string, persona_id string, space string) (*db.Post, error) {
366 query := fmt.Sprintf(`
367 SELECT %s, STRING_AGG(coalesce(post_tags.name, ''), ',') tags
368 FROM posts
369 LEFT JOIN app_users ON app_users.id = posts.user_id
370 LEFT JOIN post_tags ON post_tags.post_id = posts.id
371 WHERE filename = $1 AND user_id = $2 AND cur_space = $3
372 GROUP BY %s`, SelectPost, SelectPost)
373 r := me.Db.QueryRow(query, filename, persona_id, space)
374 post, err := CreatePostWithTagsByRow(r)
375 if err != nil {
376 return nil, err
377 }
378
379 return post, nil
380}
381
382func (me *PsqlDB) FindPostWithSlug(slug string, user_id string, space string) (*db.Post, error) {
383 query := fmt.Sprintf(`
384 SELECT %s, STRING_AGG(coalesce(post_tags.name, ''), ',') tags
385 FROM posts
386 LEFT JOIN app_users ON app_users.id = posts.user_id
387 LEFT JOIN post_tags ON post_tags.post_id = posts.id
388 WHERE slug = $1 AND user_id = $2 AND cur_space = $3
389 GROUP BY %s`, SelectPost, SelectPost)
390 r := me.Db.QueryRow(query, slug, user_id, space)
391 post, err := CreatePostWithTagsByRow(r)
392 if err != nil {
393 // attempt to find post inside post_aliases
394 alias := me.Db.QueryRow(
395 `SELECT post_aliases.post_id FROM post_aliases
396 INNER JOIN posts ON posts.id = post_aliases.post_id
397 WHERE post_aliases.slug = $1 AND posts.user_id = $2`,
398 slug, user_id,
399 )
400 postID := ""
401 err := alias.Scan(&postID)
402 if err != nil {
403 return nil, err
404 }
405
406 return me.FindPost(postID)
407 }
408
409 return post, nil
410}
411
412func (me *PsqlDB) FindPost(postID string) (*db.Post, error) {
413 post := &db.Post{}
414 query := fmt.Sprintf(`
415 SELECT %s
416 FROM posts
417 LEFT JOIN app_users ON app_users.id = posts.user_id
418 WHERE posts.id = $1`, SelectPost)
419 err := me.Db.Get(post, query, postID)
420 if err != nil {
421 return nil, err
422 }
423 return post, nil
424}
425
426func (me *PsqlDB) postPager(rs *sqlx.Rows, pageNum int, space string, tag string) (*db.Paginate[*db.Post], error) {
427 var posts []*db.Post
428 for rs.Next() {
429 post := &db.Post{}
430 err := rs.Scan(
431 &post.ID,
432 &post.UserID,
433 &post.Filename,
434 &post.Slug,
435 &post.Title,
436 &post.Text,
437 &post.Description,
438 &post.PublishAt,
439 &post.Username,
440 &post.UpdatedAt,
441 &post.MimeType,
442 )
443 if err != nil {
444 return nil, err
445 }
446
447 posts = append(posts, post)
448 }
449 if rs.Err() != nil {
450 return nil, rs.Err()
451 }
452
453 var count int
454 var err error
455 if tag == "" {
456 err = me.Db.QueryRow(`SELECT count(id) FROM posts WHERE hidden = FALSE AND cur_space=$1`, space).Scan(&count)
457 } else {
458 err = me.Db.QueryRow(`
459 SELECT count(posts.id)
460 FROM posts
461 LEFT JOIN post_tags ON post_tags.post_id = posts.id
462 WHERE hidden = FALSE AND cur_space=$1 and post_tags.name = $2`, space, tag).Scan(&count)
463 }
464 if err != nil {
465 return nil, err
466 }
467
468 pager := &db.Paginate[*db.Post]{
469 Data: posts,
470 Total: int(math.Ceil(float64(count) / float64(pageNum))),
471 }
472
473 return pager, nil
474}
475
476func (me *PsqlDB) FindPostsByFeed(page *db.Pager, space string) (*db.Paginate[*db.Post], error) {
477 query := `
478 SELECT *
479 FROM (
480 SELECT DISTINCT ON (posts.user_id)
481 posts.id,
482 posts.user_id,
483 posts.filename,
484 posts.slug,
485 posts.title,
486 posts.text,
487 posts.description,
488 posts.publish_at,
489 app_users.name AS username,
490 posts.updated_at,
491 posts.mime_type
492 FROM posts
493 LEFT JOIN app_users ON app_users.id = posts.user_id
494 WHERE
495 hidden = FALSE
496 AND publish_at::date <= CURRENT_DATE
497 AND cur_space = $3
498 ORDER BY posts.user_id, publish_at DESC
499 ) AS latest_posts
500 ORDER BY publish_at DESC
501 LIMIT $1 OFFSET $2`
502 rs, err := me.Db.Queryx(query, page.Num, page.Num*page.Page, space)
503 if err != nil {
504 return nil, err
505 }
506 defer func() { _ = rs.Close() }()
507 return me.postPager(rs, page.Num, space, "")
508}
509
510func (me *PsqlDB) InsertPost(post *db.Post) (*db.Post, error) {
511 var id string
512 query := `
513 INSERT INTO posts
514 (user_id, filename, slug, title, text, description, publish_at, hidden, cur_space,
515 file_size, mime_type, shasum, data, expires_at, updated_at)
516 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15)
517 RETURNING id`
518 err := me.Db.QueryRow(
519 query,
520 post.UserID,
521 post.Filename,
522 post.Slug,
523 post.Title,
524 post.Text,
525 post.Description,
526 post.PublishAt,
527 post.Hidden,
528 post.Space,
529 post.FileSize,
530 post.MimeType,
531 post.Shasum,
532 post.Data,
533 post.ExpiresAt,
534 post.UpdatedAt,
535 ).Scan(&id)
536 if err != nil {
537 return nil, err
538 }
539
540 return me.FindPost(id)
541}
542
543func (me *PsqlDB) UpdatePost(post *db.Post) (*db.Post, error) {
544 query := `
545 UPDATE posts
546 SET slug = $1, title = $2, text = $3, description = $4, updated_at = $5, publish_at = $6,
547 file_size = $7, shasum = $8, data = $9, hidden = $11, expires_at = $12
548 WHERE id = $10`
549 _, err := me.Db.Exec(
550 query,
551 post.Slug,
552 post.Title,
553 post.Text,
554 post.Description,
555 post.UpdatedAt,
556 post.PublishAt,
557 post.FileSize,
558 post.Shasum,
559 post.Data,
560 post.ID,
561 post.Hidden,
562 post.ExpiresAt,
563 )
564 if err != nil {
565 return nil, err
566 }
567
568 return me.FindPost(post.ID)
569}
570
571func (me *PsqlDB) RemovePosts(postIDs []string) error {
572 param := "{" + strings.Join(postIDs, ",") + "}"
573 _, err := me.Db.Exec(`DELETE FROM posts WHERE id = ANY($1::uuid[])`, param)
574 return err
575}
576
577func (me *PsqlDB) FindPostsByUser(page *db.Pager, userID string, space string) (*db.Paginate[*db.Post], error) {
578 var posts []*db.Post
579 query := fmt.Sprintf(`
580 SELECT %s, STRING_AGG(coalesce(post_tags.name, ''), ',') tags
581 FROM posts
582 LEFT JOIN app_users ON app_users.id = posts.user_id
583 LEFT JOIN post_tags ON post_tags.post_id = posts.id
584 WHERE
585 hidden = FALSE AND
586 user_id = $1 AND
587 publish_at::date <= CURRENT_DATE AND
588 cur_space = $2
589 GROUP BY %s
590 ORDER BY publish_at DESC, slug DESC
591 LIMIT $3 OFFSET $4`, SelectPost, SelectPost)
592 rs, err := me.Db.Queryx(
593 query,
594 userID,
595 space,
596 page.Num,
597 page.Num*page.Page,
598 )
599 if err != nil {
600 return nil, err
601 }
602 defer func() { _ = rs.Close() }()
603 for rs.Next() {
604 post, err := CreatePostWithTagsByRow(rs)
605 if err != nil {
606 return nil, err
607 }
608
609 posts = append(posts, post)
610 }
611
612 if rs.Err() != nil {
613 return nil, rs.Err()
614 }
615
616 var count int
617 err = me.Db.QueryRow(`SELECT count(id) FROM posts WHERE hidden = FALSE AND cur_space=$1`, space).Scan(&count)
618 if err != nil {
619 return nil, err
620 }
621
622 pager := &db.Paginate[*db.Post]{
623 Data: posts,
624 Total: int(math.Ceil(float64(count) / float64(page.Num))),
625 }
626 return pager, nil
627}
628
629func (me *PsqlDB) FindAllPostsByUser(userID string, space string) ([]*db.Post, error) {
630 var posts []*db.Post
631 query := fmt.Sprintf(`
632 SELECT %s
633 FROM posts
634 LEFT JOIN app_users ON app_users.id = posts.user_id
635 WHERE
636 user_id = $1 AND
637 cur_space = $2
638 ORDER BY publish_at DESC`, SelectPost)
639 err := me.Db.Select(&posts, query, userID, space)
640 if err != nil {
641 return nil, err
642 }
643 return posts, nil
644}
645
646func (me *PsqlDB) FindPosts() ([]*db.Post, error) {
647 var posts []*db.Post
648 query := fmt.Sprintf(`
649 SELECT %s
650 FROM posts
651 LEFT JOIN app_users ON app_users.id = posts.user_id`, SelectPost)
652 err := me.Db.Select(&posts, query)
653 if err != nil {
654 return nil, err
655 }
656 return posts, nil
657}
658
659func (me *PsqlDB) FindExpiredPosts(space string) ([]*db.Post, error) {
660 var posts []*db.Post
661 query := fmt.Sprintf(`
662 SELECT %s
663 FROM posts
664 LEFT JOIN app_users ON app_users.id = posts.user_id
665 WHERE
666 cur_space = $1 AND
667 expires_at <= now();
668 `, SelectPost)
669 err := me.Db.Select(&posts, query, space)
670 if err != nil {
671 return nil, err
672 }
673 return posts, nil
674}
675
676func (me *PsqlDB) Close() error {
677 me.Logger.Info("Closing db")
678 return me.Db.Close()
679}
680
681func newNullString(s string) sql.NullString {
682 if len(s) == 0 {
683 return sql.NullString{}
684 }
685 return sql.NullString{
686 String: s,
687 Valid: true,
688 }
689}
690
691func (me *PsqlDB) InsertVisit(visit *db.AnalyticsVisits) error {
692 _, err := me.Db.Exec(
693 `INSERT INTO analytics_visits (user_id, project_id, post_id, namespace, host, path, ip_address, user_agent, referer, status, content_type) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11);`,
694 visit.UserID,
695 newNullString(visit.ProjectID),
696 newNullString(visit.PostID),
697 newNullString(visit.Namespace),
698 visit.Host,
699 visit.Path,
700 visit.IpAddress,
701 visit.UserAgent,
702 visit.Referer,
703 visit.Status,
704 visit.ContentType,
705 )
706 return err
707}
708
709func visitFilterBy(opts *db.SummaryOpts) (string, string) {
710 where := ""
711 val := ""
712 if opts.Host != "" {
713 where = "host"
714 val = opts.Host
715 } else if opts.Path != "" {
716 where = "path"
717 val = opts.Path
718 }
719
720 return where, val
721}
722
723func (me *PsqlDB) visitUnique(opts *db.SummaryOpts) ([]*db.VisitInterval, error) {
724 where, with := visitFilterBy(opts)
725 uniqueVisitors := fmt.Sprintf(`SELECT
726 date_trunc('%s', created_at) as interval_start,
727 count(DISTINCT ip_address) as unique_visitors
728 FROM analytics_visits
729 WHERE created_at >= $1 AND %s = $2 AND user_id = $3 AND status <> 404
730 GROUP BY interval_start`, opts.Interval, where)
731
732 intervals := []*db.VisitInterval{}
733 rs, err := me.Db.Queryx(uniqueVisitors, opts.Origin, with, opts.UserID)
734 if err != nil {
735 return nil, err
736 }
737 defer func() { _ = rs.Close() }()
738
739 for rs.Next() {
740 interval := &db.VisitInterval{}
741 err := rs.Scan(
742 &interval.Interval,
743 &interval.Visitors,
744 )
745 if err != nil {
746 return nil, err
747 }
748
749 intervals = append(intervals, interval)
750 }
751 if rs.Err() != nil {
752 return nil, rs.Err()
753 }
754 return intervals, nil
755}
756
757func (me *PsqlDB) visitReferer(opts *db.SummaryOpts) ([]*db.VisitUrl, error) {
758 where, with := visitFilterBy(opts)
759 topUrls := fmt.Sprintf(`SELECT
760 referer,
761 count(DISTINCT ip_address) as referer_count
762 FROM analytics_visits
763 WHERE created_at >= $1 AND %s = $2 AND user_id = $3 AND referer <> '' AND status <> 404
764 GROUP BY referer
765 ORDER BY referer_count DESC
766 LIMIT 10`, where)
767
768 intervals := []*db.VisitUrl{}
769 rs, err := me.Db.Queryx(topUrls, opts.Origin, with, opts.UserID)
770 if err != nil {
771 return nil, err
772 }
773 defer func() { _ = rs.Close() }()
774
775 for rs.Next() {
776 interval := &db.VisitUrl{}
777 err := rs.Scan(
778 &interval.Url,
779 &interval.Count,
780 )
781 if err != nil {
782 return nil, err
783 }
784
785 intervals = append(intervals, interval)
786 }
787 if rs.Err() != nil {
788 return nil, rs.Err()
789 }
790 return intervals, nil
791}
792
793func (me *PsqlDB) visitUrl(opts *db.SummaryOpts) ([]*db.VisitUrl, error) {
794 where, with := visitFilterBy(opts)
795 topUrls := fmt.Sprintf(`SELECT
796 path,
797 count(DISTINCT ip_address) as path_count
798 FROM analytics_visits
799 WHERE created_at >= $1 AND %s = $2 AND user_id = $3 AND path <> '' AND status <> 404
800 GROUP BY path
801 ORDER BY path_count DESC
802 LIMIT 10`, where)
803
804 intervals := []*db.VisitUrl{}
805 rs, err := me.Db.Queryx(topUrls, opts.Origin, with, opts.UserID)
806 if err != nil {
807 return nil, err
808 }
809 defer func() { _ = rs.Close() }()
810
811 for rs.Next() {
812 interval := &db.VisitUrl{}
813 err := rs.Scan(
814 &interval.Url,
815 &interval.Count,
816 )
817 if err != nil {
818 return nil, err
819 }
820
821 intervals = append(intervals, interval)
822 }
823 if rs.Err() != nil {
824 return nil, rs.Err()
825 }
826 return intervals, nil
827}
828
829func (me *PsqlDB) VisitUrlNotFound(opts *db.SummaryOpts) ([]*db.VisitUrl, error) {
830 limit := opts.Limit
831 if limit == 0 {
832 limit = 10
833 }
834 where, with := visitFilterBy(opts)
835 topUrls := fmt.Sprintf(`SELECT
836 path,
837 count(DISTINCT ip_address) as path_count
838 FROM analytics_visits
839 WHERE created_at >= $1 AND %s = $2 AND user_id = $3 AND path <> '' AND status = 404
840 GROUP BY path
841 ORDER BY path_count DESC
842 LIMIT %d`, where, limit)
843
844 intervals := []*db.VisitUrl{}
845 rs, err := me.Db.Queryx(topUrls, opts.Origin, with, opts.UserID)
846 if err != nil {
847 return nil, err
848 }
849 defer func() { _ = rs.Close() }()
850
851 for rs.Next() {
852 interval := &db.VisitUrl{}
853 err := rs.Scan(
854 &interval.Url,
855 &interval.Count,
856 )
857 if err != nil {
858 return nil, err
859 }
860
861 intervals = append(intervals, interval)
862 }
863 if rs.Err() != nil {
864 return nil, rs.Err()
865 }
866 return intervals, nil
867}
868
869func (me *PsqlDB) visitHost(opts *db.SummaryOpts) ([]*db.VisitUrl, error) {
870 topUrls := `SELECT
871 host,
872 count(DISTINCT ip_address) as host_count
873 FROM analytics_visits
874 WHERE user_id = $1 AND host <> ''
875 GROUP BY host
876 ORDER BY host_count DESC`
877
878 intervals := []*db.VisitUrl{}
879 rs, err := me.Db.Queryx(topUrls, opts.UserID)
880 if err != nil {
881 return nil, err
882 }
883 defer func() { _ = rs.Close() }()
884
885 for rs.Next() {
886 interval := &db.VisitUrl{}
887 err := rs.Scan(
888 &interval.Url,
889 &interval.Count,
890 )
891 if err != nil {
892 return nil, err
893 }
894
895 intervals = append(intervals, interval)
896 }
897 if rs.Err() != nil {
898 return nil, rs.Err()
899 }
900 return intervals, nil
901}
902
903func (me *PsqlDB) VisitSummary(opts *db.SummaryOpts) (*db.SummaryVisits, error) {
904 visitors, err := me.visitUnique(opts)
905 if err != nil {
906 return nil, err
907 }
908
909 urls, err := me.visitUrl(opts)
910 if err != nil {
911 return nil, err
912 }
913
914 notFound, err := me.VisitUrlNotFound(opts)
915 if err != nil {
916 return nil, err
917 }
918
919 refs, err := me.visitReferer(opts)
920 if err != nil {
921 return nil, err
922 }
923
924 return &db.SummaryVisits{
925 Intervals: visitors,
926 TopUrls: urls,
927 TopReferers: refs,
928 NotFoundUrls: notFound,
929 }, nil
930}
931
932func (me *PsqlDB) FindVisitSiteList(opts *db.SummaryOpts) ([]*db.VisitUrl, error) {
933 return me.visitHost(opts)
934}
935
936func (me *PsqlDB) FindUsers() ([]*db.User, error) {
937 var users []*db.User
938 err := me.Db.Select(&users, `SELECT id, COALESCE(name, '') as name, created_at FROM app_users ORDER BY name ASC`)
939 if err != nil {
940 return nil, err
941 }
942 return users, nil
943}
944
945func (me *PsqlDB) removeTagsByPost(tx *sqlx.Tx, postID string) error {
946 _, err := tx.Exec(`DELETE FROM post_tags WHERE post_id = $1`, postID)
947 return err
948}
949
950func (me *PsqlDB) insertTagsByPost(tx *sqlx.Tx, tags []string, postID string) ([]string, error) {
951 ids := make([]string, 0)
952 for _, tag := range tags {
953 id := ""
954 err := tx.QueryRow(`INSERT INTO post_tags (post_id, name) VALUES($1, $2) RETURNING id;`, postID, tag).Scan(&id)
955 if err != nil {
956 return nil, err
957 }
958 ids = append(ids, id)
959 }
960
961 return ids, nil
962}
963
964func (me *PsqlDB) ReplaceTagsByPost(tags []string, postID string) error {
965 tx, err := me.Db.Beginx()
966 if err != nil {
967 return err
968 }
969 defer func() {
970 _ = tx.Rollback()
971 }()
972
973 err = me.removeTagsByPost(tx, postID)
974 if err != nil {
975 return err
976 }
977
978 _, err = me.insertTagsByPost(tx, tags, postID)
979 if err != nil {
980 return err
981 }
982
983 err = tx.Commit()
984 return err
985}
986
987func (me *PsqlDB) removeAliasesByPost(tx *sqlx.Tx, postID string) error {
988 _, err := tx.Exec(`DELETE FROM post_aliases WHERE post_id = $1`, postID)
989 return err
990}
991
992func (me *PsqlDB) insertAliasesByPost(tx *sqlx.Tx, aliases []string, postID string) ([]string, error) {
993 // hardcoded
994 denyList := []string{
995 "rss",
996 "rss.xml",
997 "rss.atom",
998 "atom.xml",
999 "feed.xml",
1000 "smol.css",
1001 "main.css",
1002 "syntax.css",
1003 "card.png",
1004 "favicon-16x16.png",
1005 "favicon-32x32.png",
1006 "apple-touch-icon.png",
1007 "favicon.ico",
1008 "robots.txt",
1009 "atom",
1010 "blog/index.xml",
1011 }
1012
1013 ids := make([]string, 0)
1014 for _, alias := range aliases {
1015 if slices.Contains(denyList, alias) {
1016 me.Logger.Info(
1017 "name is in the deny list for aliases because it conflicts with a static route, skipping",
1018 "alias", alias,
1019 )
1020 continue
1021 }
1022 id := ""
1023 err := tx.QueryRow(`INSERT INTO post_aliases (post_id, slug) VALUES($1, $2) RETURNING id;`, postID, alias).Scan(&id)
1024 if err != nil {
1025 return nil, err
1026 }
1027 ids = append(ids, id)
1028 }
1029
1030 return ids, nil
1031}
1032
1033func (me *PsqlDB) ReplaceAliasesByPost(aliases []string, postID string) error {
1034 tx, err := me.Db.Beginx()
1035 if err != nil {
1036 return err
1037 }
1038 defer func() {
1039 _ = tx.Rollback()
1040 }()
1041
1042 err = me.removeAliasesByPost(tx, postID)
1043 if err != nil {
1044 return err
1045 }
1046
1047 _, err = me.insertAliasesByPost(tx, aliases, postID)
1048 if err != nil {
1049 return err
1050 }
1051
1052 err = tx.Commit()
1053 return err
1054}
1055
1056func (me *PsqlDB) FindUserPostsByTag(page *db.Pager, tag, userID, space string) (*db.Paginate[*db.Post], error) {
1057 var posts []*db.Post
1058 query := fmt.Sprintf(`
1059 SELECT %s
1060 FROM posts
1061 LEFT JOIN app_users ON app_users.id = posts.user_id
1062 LEFT JOIN post_tags ON post_tags.post_id = posts.id
1063 WHERE
1064 hidden = FALSE AND
1065 user_id = $1 AND
1066 (post_tags.name = $2 OR hidden = true) AND
1067 publish_at::date <= CURRENT_DATE AND
1068 cur_space = $3
1069 ORDER BY publish_at DESC
1070 LIMIT $4 OFFSET $5`, SelectPost)
1071 err := me.Db.Select(
1072 &posts,
1073 query,
1074 userID,
1075 tag,
1076 space,
1077 page.Num,
1078 page.Num*page.Page,
1079 )
1080 if err != nil {
1081 return nil, err
1082 }
1083
1084 var count int
1085 err = me.Db.QueryRow(`SELECT count(id) FROM posts WHERE hidden = FALSE AND cur_space=$1`, space).Scan(&count)
1086 if err != nil {
1087 return nil, err
1088 }
1089
1090 pager := &db.Paginate[*db.Post]{
1091 Data: posts,
1092 Total: int(math.Ceil(float64(count) / float64(page.Num))),
1093 }
1094 return pager, nil
1095}
1096
1097func (me *PsqlDB) FindPostsByTag(pager *db.Pager, tag, space string) (*db.Paginate[*db.Post], error) {
1098 query := `
1099 SELECT
1100 posts.id,
1101 user_id,
1102 filename,
1103 slug,
1104 title,
1105 text,
1106 description,
1107 publish_at,
1108 app_users.name as username,
1109 posts.updated_at,
1110 posts.mime_type
1111 FROM posts
1112 LEFT JOIN app_users ON app_users.id = posts.user_id
1113 LEFT JOIN post_tags ON post_tags.post_id = posts.id
1114 WHERE
1115 post_tags.name = $3 AND
1116 publish_at::date <= CURRENT_DATE AND
1117 cur_space = $4
1118 ORDER BY publish_at DESC
1119 LIMIT $1 OFFSET $2`
1120 rs, err := me.Db.Queryx(
1121 query,
1122 pager.Num,
1123 pager.Num*pager.Page,
1124 tag,
1125 space,
1126 )
1127 if err != nil {
1128 return nil, err
1129 }
1130 defer func() { _ = rs.Close() }()
1131
1132 return me.postPager(rs, pager.Num, space, tag)
1133}
1134
1135func (me *PsqlDB) FindPopularTags(space string) ([]string, error) {
1136 tags := make([]string, 0)
1137 query := `
1138 SELECT name, count(post_id) as "tally"
1139 FROM post_tags
1140 LEFT JOIN posts ON posts.id = post_id
1141 WHERE posts.cur_space = $1
1142 GROUP BY name
1143 ORDER BY tally DESC
1144 LIMIT 5`
1145 rs, err := me.Db.Queryx(query, space)
1146 if err != nil {
1147 return tags, err
1148 }
1149 defer func() { _ = rs.Close() }()
1150 for rs.Next() {
1151 name := ""
1152 tally := 0
1153 err := rs.Scan(&name, &tally)
1154 if err != nil {
1155 return tags, err
1156 }
1157
1158 tags = append(tags, name)
1159 }
1160 if rs.Err() != nil {
1161 return tags, rs.Err()
1162 }
1163 return tags, nil
1164}
1165
1166func (me *PsqlDB) FindFeature(userID string, feature string) (*db.FeatureFlag, error) {
1167 ff := &db.FeatureFlag{}
1168 err := me.Db.Get(ff, `SELECT * FROM feature_flags WHERE user_id = $1 AND name = $2 ORDER BY expires_at DESC LIMIT 1`, userID, feature)
1169 if err != nil {
1170 return nil, err
1171 }
1172 return ff, nil
1173}
1174
1175func (me *PsqlDB) FindFeaturesByUser(userID string) ([]*db.FeatureFlag, error) {
1176 var features []*db.FeatureFlag
1177 // https://stackoverflow.com/a/16920077
1178 query := `SELECT DISTINCT ON (name) *
1179 FROM feature_flags
1180 WHERE user_id=$1
1181 ORDER BY name, expires_at DESC;`
1182 err := me.Db.Select(&features, query, userID)
1183 if err != nil {
1184 return nil, err
1185 }
1186 return features, nil
1187}
1188
1189func (me *PsqlDB) HasFeatureByUser(userID string, feature string) bool {
1190 ff, err := me.FindFeature(userID, feature)
1191 if err != nil {
1192 return false
1193 }
1194 return ff.IsValid()
1195}
1196
1197func (me *PsqlDB) InsertFeedItems(postID string, items []*db.FeedItem) error {
1198 tx, err := me.Db.Beginx()
1199 if err != nil {
1200 return err
1201 }
1202 defer func() {
1203 _ = tx.Rollback()
1204 }()
1205
1206 for _, item := range items {
1207 _, err := tx.Exec(
1208 `INSERT INTO feed_items (post_id, guid, data) VALUES ($1, $2, $3) RETURNING id;`,
1209 item.PostID,
1210 item.GUID,
1211 item.Data,
1212 )
1213 if err != nil {
1214 return fmt.Errorf(
1215 "post id:%s, link:%s, guid:%s, err:%w",
1216 item.PostID, item.Data.Link, item.GUID, err,
1217 )
1218 }
1219 }
1220
1221 err = tx.Commit()
1222 return err
1223}
1224
1225func (me *PsqlDB) FindFeedItemsByPostID(postID string) ([]*db.FeedItem, error) {
1226 var items []*db.FeedItem
1227 err := me.Db.Select(&items, `SELECT * FROM feed_items WHERE post_id=$1`, postID)
1228 if err != nil {
1229 return nil, err
1230 }
1231 return items, nil
1232}
1233
1234func (me *PsqlDB) InsertProject(userID, name, projectDir string) (string, error) {
1235 if !shared.IsValidSubdomain(name) {
1236 return "", fmt.Errorf("'%s' is not a valid project name, must match /^[a-z0-9-]+$/", name)
1237 }
1238
1239 var id string
1240 err := me.Db.QueryRow(`INSERT INTO projects (user_id, name, project_dir) VALUES ($1, $2, $3) RETURNING id;`, userID, name, projectDir).Scan(&id)
1241 if err != nil {
1242 return "", err
1243 }
1244 return id, nil
1245}
1246
1247func (me *PsqlDB) UpdateProject(userID, name string) error {
1248 _, err := me.Db.Exec(`UPDATE projects SET updated_at = $3 WHERE user_id = $1 AND name = $2;`, userID, name, time.Now())
1249 return err
1250}
1251
1252func (me *PsqlDB) FindProjectByName(userID, name string) (*db.Project, error) {
1253 project := &db.Project{}
1254 err := me.Db.Get(project, `SELECT * FROM projects WHERE user_id = $1 AND name = $2;`, userID, name)
1255 if err != nil {
1256 return nil, err
1257 }
1258 return project, nil
1259}
1260
1261func (me *PsqlDB) InsertToken(userID, name string) (string, error) {
1262 var token string
1263 err := me.Db.QueryRow(`INSERT INTO tokens (user_id, name) VALUES($1, $2) RETURNING token;`, userID, name).Scan(&token)
1264 if err != nil {
1265 return "", err
1266 }
1267 return token, nil
1268}
1269
1270func (me *PsqlDB) UpsertToken(userID, name string) (string, error) {
1271 token, _ := me.findTokenByName(userID, name)
1272 if token != "" {
1273 return token, nil
1274 }
1275
1276 token, err := me.InsertToken(userID, name)
1277 return token, err
1278}
1279
1280func (me *PsqlDB) findTokenByName(userID, name string) (string, error) {
1281 var token string
1282 err := me.Db.QueryRow(`SELECT token FROM tokens WHERE user_id = $1 AND name = $2`, userID, name).Scan(&token)
1283 if err != nil {
1284 return "", err
1285 }
1286 return token, nil
1287}
1288
1289func (me *PsqlDB) RemoveToken(tokenID string) error {
1290 _, err := me.Db.Exec(`DELETE FROM tokens WHERE id = $1`, tokenID)
1291 return err
1292}
1293
1294func (me *PsqlDB) FindTokensByUser(userID string) ([]*db.Token, error) {
1295 var tokens []*db.Token
1296 err := me.Db.Select(&tokens, `SELECT * FROM tokens WHERE user_id = $1`, userID)
1297 if err != nil {
1298 return nil, err
1299 }
1300 return tokens, nil
1301}
1302
1303func (me *PsqlDB) InsertFeature(userID, name string, expiresAt time.Time) (*db.FeatureFlag, error) {
1304 var featureID string
1305 err := me.Db.QueryRow(
1306 `INSERT INTO feature_flags (user_id, name, expires_at) VALUES ($1, $2, $3) RETURNING id;`,
1307 userID,
1308 name,
1309 expiresAt,
1310 ).Scan(&featureID)
1311 if err != nil {
1312 return nil, err
1313 }
1314
1315 feature, err := me.FindFeature(userID, name)
1316 if err != nil {
1317 return nil, err
1318 }
1319
1320 return feature, nil
1321}
1322
1323func (me *PsqlDB) RemoveFeature(userID string, name string) error {
1324 _, err := me.Db.Exec(`DELETE FROM feature_flags WHERE user_id = $1 AND name = $2`, userID, name)
1325 return err
1326}
1327
1328func (me *PsqlDB) createFeatureExpiresAt(userID, name string) time.Time {
1329 ff, _ := me.FindFeature(userID, name)
1330 // if the feature flag has already expired we don't want to add a year to it since that will
1331 // not grant the user a full year
1332 if ff == nil || !ff.IsValid() {
1333 t := time.Now()
1334 return t.AddDate(1, 0, 0)
1335 }
1336 return ff.ExpiresAt.AddDate(1, 0, 0)
1337}
1338
1339func (me *PsqlDB) AddPicoPlusUser(username, email, paymentType, txId string) error {
1340 user, err := me.FindUserByName(username)
1341 if err != nil {
1342 return err
1343 }
1344
1345 tx, err := me.Db.Beginx()
1346 if err != nil {
1347 return err
1348 }
1349 defer func() {
1350 _ = tx.Rollback()
1351 }()
1352
1353 var paymentHistoryId sql.NullString
1354 if paymentType != "" {
1355 data := db.PaymentHistoryData{
1356 Notes: "",
1357 TxID: txId,
1358 }
1359
1360 err := tx.QueryRow(
1361 `INSERT INTO payment_history (user_id, payment_type, amount, data) VALUES ($1, $2, 24 * 1000000, $3) RETURNING id;`,
1362 user.ID,
1363 paymentType,
1364 data,
1365 ).Scan(&paymentHistoryId)
1366 if err != nil {
1367 return err
1368 }
1369 }
1370
1371 plus := me.createFeatureExpiresAt(user.ID, "plus")
1372 plusQuery := fmt.Sprintf(`INSERT INTO feature_flags (user_id, name, data, expires_at, payment_history_id)
1373 VALUES ($1, 'plus', '{"storage_max":10000000000, "file_max":50000000, "email": "%s"}'::jsonb, $2, $3);`, email)
1374 _, err = tx.Exec(plusQuery, user.ID, plus, paymentHistoryId)
1375 if err != nil {
1376 return err
1377 }
1378
1379 return tx.Commit()
1380}
1381
1382func (me *PsqlDB) UpsertProject(userID, projectName, projectDir string) (*db.Project, error) {
1383 project, err := me.FindProjectByName(userID, projectName)
1384 if err == nil {
1385 // this just updates the `createdAt` timestamp, useful for book-keeping
1386 err = me.UpdateProject(userID, projectName)
1387 if err != nil {
1388 me.Logger.Error("could not update project", "err", err)
1389 return nil, err
1390 }
1391 return project, nil
1392 }
1393
1394 _, err = me.InsertProject(userID, projectName, projectName)
1395 if err != nil {
1396 me.Logger.Error("could not create project", "err", err)
1397 return nil, err
1398 }
1399 return me.FindProjectByName(userID, projectName)
1400}
1401
1402func (me *PsqlDB) findPagesStats(userID string) (*db.UserServiceStats, error) {
1403 stats := db.UserServiceStats{
1404 Service: "pgs",
1405 }
1406 err := me.Db.QueryRow(
1407 `SELECT count(id), min(created_at), max(created_at), max(updated_at) FROM projects WHERE user_id=$1`,
1408 userID,
1409 ).Scan(&stats.Num, &stats.FirstCreatedAt, &stats.LastestCreatedAt, &stats.LatestUpdatedAt)
1410 if err != nil {
1411 return nil, err
1412 }
1413
1414 return &stats, nil
1415}
1416
1417func (me *PsqlDB) InsertTunsEventLog(log *db.TunsEventLog) error {
1418 _, err := me.Db.Exec(
1419 `INSERT INTO tuns_event_logs
1420 (user_id, server_id, remote_addr, event_type, tunnel_type, connection_type, tunnel_id)
1421 VALUES
1422 ($1, $2, $3, $4, $5, $6, $7)`,
1423 log.UserId, log.ServerID, log.RemoteAddr, log.EventType, log.TunnelType,
1424 log.ConnectionType, log.TunnelID,
1425 )
1426 return err
1427}
1428
1429func (me *PsqlDB) FindTunsEventLogsByAddr(userID, addr string) ([]*db.TunsEventLog, error) {
1430 var logs []*db.TunsEventLog
1431 err := me.Db.Select(&logs,
1432 `SELECT * FROM tuns_event_logs WHERE user_id=$1 AND tunnel_id=$2 ORDER BY created_at DESC`, userID, addr)
1433 if err != nil {
1434 return nil, err
1435 }
1436 return logs, nil
1437}
1438
1439func (me *PsqlDB) FindTunsEventLogs(userID string) ([]*db.TunsEventLog, error) {
1440 var logs []*db.TunsEventLog
1441 err := me.Db.Select(&logs,
1442 `SELECT * FROM tuns_event_logs WHERE user_id=$1 ORDER BY created_at DESC`, userID)
1443 if err != nil {
1444 return nil, err
1445 }
1446 return logs, nil
1447}
1448
1449func (me *PsqlDB) FindUserStats(userID string) (*db.UserStats, error) {
1450 stats := db.UserStats{}
1451 rs, err := me.Db.Queryx(`SELECT cur_space, count(id), min(created_at), max(created_at), max(updated_at) FROM posts WHERE user_id=$1 GROUP BY cur_space`, userID)
1452 if err != nil {
1453 return nil, err
1454 }
1455 defer func() { _ = rs.Close() }()
1456
1457 for rs.Next() {
1458 stat := db.UserServiceStats{}
1459 err := rs.Scan(&stat.Service, &stat.Num, &stat.FirstCreatedAt, &stat.LastestCreatedAt, &stat.LatestUpdatedAt)
1460 if err != nil {
1461 return nil, err
1462 }
1463 switch stat.Service {
1464 case "prose":
1465 stats.Prose = stat
1466 case "pastes":
1467 stats.Pastes = stat
1468 case "feeds":
1469 stats.Feeds = stat
1470 }
1471 }
1472
1473 if rs.Err() != nil {
1474 return nil, rs.Err()
1475 }
1476
1477 pgs, err := me.findPagesStats(userID)
1478 if err != nil {
1479 return nil, err
1480 }
1481 stats.Pages = *pgs
1482 return &stats, err
1483}
1484
1485func (me *PsqlDB) FindAccessLogs(userID string, fromDate *time.Time) ([]*db.AccessLog, error) {
1486 var logs []*db.AccessLog
1487 err := me.Db.Select(&logs, `SELECT * FROM access_logs WHERE user_id=$1 AND created_at >= $2 ORDER BY created_at ASC`, userID, fromDate)
1488 if err != nil {
1489 return nil, err
1490 }
1491 return logs, nil
1492}
1493
1494func (me *PsqlDB) FindAccessLogsByPubkey(pubkey string, fromDate *time.Time) ([]*db.AccessLog, error) {
1495 var logs []*db.AccessLog
1496 err := me.Db.Select(&logs, `SELECT * FROM access_logs WHERE pubkey=$1 AND created_at >= $2 ORDER BY created_at ASC`, pubkey, fromDate)
1497 if err != nil {
1498 return nil, err
1499 }
1500 return logs, nil
1501}
1502
1503func (me *PsqlDB) FindPubkeysInAccessLogs(userID string) ([]string, error) {
1504 var pubkeys []string
1505 err := me.Db.Select(&pubkeys, `SELECT DISTINCT(pubkey) FROM access_logs WHERE user_id=$1`, userID)
1506 if err != nil {
1507 return nil, err
1508 }
1509 return pubkeys, nil
1510}
1511
1512func (me *PsqlDB) InsertAccessLog(log *db.AccessLog) error {
1513 _, err := me.Db.Exec(
1514 `INSERT INTO access_logs (user_id, service, pubkey, identity) VALUES ($1, $2, $3, $4);`,
1515 log.UserID,
1516 log.Service,
1517 log.Pubkey,
1518 log.Identity,
1519 )
1520 return err
1521}
1522
1523func (me *PsqlDB) UpsertPipeMonitor(userID, topic string, dur time.Duration, winEnd *time.Time) error {
1524 durStr := fmt.Sprintf("%d seconds", int64(dur.Seconds()))
1525 _, err := me.Db.Exec(
1526 `INSERT INTO pipe_monitors (user_id, topic, window_dur, window_end)
1527 VALUES ($1, $2, $3::interval, $4)
1528 ON CONFLICT (user_id, topic) DO UPDATE SET window_dur = $3::interval, window_end = $4, updated_at = NOW();`,
1529 userID,
1530 topic,
1531 durStr,
1532 winEnd,
1533 )
1534 return err
1535}
1536
1537func (me *PsqlDB) UpdatePipeMonitorLastPing(userID, topic string, lastPing *time.Time) error {
1538 _, err := me.Db.Exec(
1539 `UPDATE pipe_monitors SET last_ping = $3, updated_at = NOW() WHERE user_id = $1 AND topic = $2;`,
1540 userID,
1541 topic,
1542 lastPing,
1543 )
1544 return err
1545}
1546
1547func (me *PsqlDB) RemovePipeMonitor(userID, topic string) error {
1548 _, err := me.Db.Exec(
1549 `DELETE FROM pipe_monitors WHERE user_id = $1 AND topic = $2;`,
1550 userID,
1551 topic,
1552 )
1553 return err
1554}
1555
1556func (me *PsqlDB) FindPipeMonitorByTopic(userID, topic string) (*db.PipeMonitor, error) {
1557 monitor := &db.PipeMonitor{}
1558 err := me.Db.Get(monitor, `SELECT id, user_id, topic, (EXTRACT(EPOCH FROM window_dur) * 1000000000)::bigint as window_dur, window_end, last_ping, created_at, updated_at FROM pipe_monitors WHERE user_id = $1 AND topic = $2;`, userID, topic)
1559 if err != nil {
1560 return nil, err
1561 }
1562 return monitor, nil
1563}
1564
1565func (me *PsqlDB) FindPipeMonitorsByUser(userID string) ([]*db.PipeMonitor, error) {
1566 var monitors []*db.PipeMonitor
1567 err := me.Db.Select(&monitors, `SELECT id, user_id, topic, (EXTRACT(EPOCH FROM window_dur) * 1000000000)::bigint as window_dur, window_end, last_ping, created_at, updated_at FROM pipe_monitors WHERE user_id = $1 ORDER BY topic;`, userID)
1568 if err != nil {
1569 return nil, err
1570 }
1571 return monitors, nil
1572}
1573
1574func (me *PsqlDB) InsertPipeMonitorHistory(monitorID string, windowDur time.Duration, windowEnd, lastPing *time.Time) error {
1575 durStr := fmt.Sprintf("%d seconds", int64(windowDur.Seconds()))
1576 _, err := me.Db.Exec(
1577 `INSERT INTO pipe_monitors_history (monitor_id, window_dur, window_end, last_ping) VALUES ($1, $2::interval, $3, $4)`,
1578 monitorID, durStr, windowEnd, lastPing,
1579 )
1580 return err
1581}
1582
1583func (me *PsqlDB) FindPipeMonitorHistory(monitorID string, from, to time.Time) ([]*db.PipeMonitorHistory, error) {
1584 var history []*db.PipeMonitorHistory
1585 err := me.Db.Select(
1586 &history,
1587 `SELECT id, monitor_id, (EXTRACT(EPOCH FROM window_dur) * 1000000000)::bigint as window_dur, window_end, last_ping, created_at, updated_at FROM pipe_monitors_history WHERE monitor_id = $1 AND last_ping <= $2 AND window_end >= $3 ORDER BY last_ping ASC`,
1588 monitorID, to, from,
1589 )
1590 if err != nil {
1591 return nil, err
1592 }
1593 return history, nil
1594}