Eric Bower
·
2026-03-05
storage.go
1package postgres
2
3import (
4 "database/sql"
5 "errors"
6 "fmt"
7 "log/slog"
8 "math"
9 "slices"
10 "strings"
11 "time"
12
13 "github.com/jmoiron/sqlx"
14 _ "github.com/lib/pq"
15 "github.com/picosh/pico/pkg/db"
16 "github.com/picosh/pico/pkg/shared"
17)
18
19var PAGER_SIZE = 15
20
21var SelectPost = `
22 posts.id, user_id, app_users.name, filename, slug, title, text, description,
23 posts.created_at, publish_at, posts.updated_at, hidden, file_size, mime_type, shasum, data, expires_at, views`
24
25type PsqlDB struct {
26 Logger *slog.Logger
27 Db *sqlx.DB
28}
29
30type RowScanner interface {
31 Scan(dest ...any) error
32}
33
34func CreatePostWithTagsByRow(r RowScanner) (*db.Post, error) {
35 post := &db.Post{}
36 tagStr := ""
37 err := r.Scan(
38 &post.ID,
39 &post.UserID,
40 &post.Username,
41 &post.Filename,
42 &post.Slug,
43 &post.Title,
44 &post.Text,
45 &post.Description,
46 &post.CreatedAt,
47 &post.PublishAt,
48 &post.UpdatedAt,
49 &post.Hidden,
50 &post.FileSize,
51 &post.MimeType,
52 &post.Shasum,
53 &post.Data,
54 &post.ExpiresAt,
55 &post.Views,
56 &tagStr,
57 )
58 if err != nil {
59 return nil, err
60 }
61
62 tags := strings.Split(tagStr, ",")
63 for _, tag := range tags {
64 tg := strings.TrimSpace(tag)
65 if tg == "" {
66 continue
67 }
68 post.Tags = append(post.Tags, tg)
69 }
70
71 return post, nil
72}
73
74func NewDB(databaseUrl string, logger *slog.Logger) *PsqlDB {
75 var err error
76 d := &PsqlDB{
77 Logger: logger,
78 }
79 d.Logger.Info("Connecting to postgres", "databaseUrl", databaseUrl)
80
81 db, err := sqlx.Connect("postgres", databaseUrl)
82 if err != nil {
83 d.Logger.Error(err.Error())
84 }
85 d.Db = db
86 return d
87}
88
89func (me *PsqlDB) RegisterUser(username, pubkey, comment string) (*db.User, error) {
90 lowerName := strings.ToLower(username)
91 valid, err := me.validateName(lowerName)
92 if !valid {
93 return nil, err
94 }
95
96 tx, err := me.Db.Beginx()
97 if err != nil {
98 return nil, err
99 }
100 defer func() {
101 _ = tx.Rollback()
102 }()
103
104 var id string
105 err = tx.QueryRow(`INSERT INTO app_users (name) VALUES($1) returning id`, lowerName).Scan(&id)
106 if err != nil {
107 return nil, err
108 }
109
110 err = me.insertPublicKeyWithTx(id, pubkey, comment, tx)
111 if err != nil {
112 return nil, err
113 }
114
115 err = tx.Commit()
116 if err != nil {
117 return nil, err
118 }
119
120 return me.FindUserByKey(username, pubkey)
121}
122
123func (me *PsqlDB) insertPublicKeyWithTx(userID, key, name string, tx *sqlx.Tx) error {
124 pk, _ := me.findPublicKeyByKey(key)
125 if pk != nil {
126 return db.ErrPublicKeyTaken
127 }
128 query := `INSERT INTO public_keys (user_id, public_key, name) VALUES ($1, $2, $3)`
129 _, err := tx.Exec(query, userID, key, name)
130 return err
131}
132
133func (me *PsqlDB) InsertPublicKey(userID, key, name string) error {
134 pk, _ := me.findPublicKeyByKey(key)
135 if pk != nil {
136 return db.ErrPublicKeyTaken
137 }
138 query := `INSERT INTO public_keys (user_id, public_key, name) VALUES ($1, $2, $3)`
139 _, err := me.Db.Exec(query, userID, key, name)
140 return err
141}
142
143func (me *PsqlDB) UpdatePublicKey(pubkeyID, name string) (*db.PublicKey, error) {
144 pk, err := me.findPublicKey(pubkeyID)
145 if err != nil {
146 return nil, err
147 }
148
149 query := `UPDATE public_keys SET name=$1 WHERE id=$2;`
150 _, err = me.Db.Exec(query, name, pk.ID)
151 if err != nil {
152 return nil, err
153 }
154
155 pk, err = me.findPublicKey(pubkeyID)
156 if err != nil {
157 return nil, err
158 }
159 return pk, nil
160}
161
162func (me *PsqlDB) findPublicKeyByKey(key string) (*db.PublicKey, error) {
163 var keys []*db.PublicKey
164 rs, err := me.Db.Queryx(`SELECT id, user_id, name, public_key, created_at FROM public_keys WHERE public_key = $1`, key)
165 if err != nil {
166 return nil, err
167 }
168 defer func() { _ = rs.Close() }()
169
170 for rs.Next() {
171 pk := &db.PublicKey{}
172 err := rs.Scan(&pk.ID, &pk.UserID, &pk.Name, &pk.Key, &pk.CreatedAt)
173 if err != nil {
174 return nil, err
175 }
176
177 keys = append(keys, pk)
178 }
179
180 if rs.Err() != nil {
181 return nil, rs.Err()
182 }
183
184 if len(keys) == 0 {
185 return nil, fmt.Errorf("pubkey not found in our database: [%s]", key)
186 }
187
188 // When we run PublicKeyByKey and there are multiple public keys returned from the database
189 // that should mean that we don't have the correct username for this public key.
190 // When that happens we need to reject the authentication and ask the user to provide the correct
191 // username when using ssh. So instead of `ssh <domain>` it should be `ssh user@<domain>`
192 if len(keys) > 1 {
193 return nil, &db.ErrMultiplePublicKeys{}
194 }
195
196 return keys[0], nil
197}
198
199func (me *PsqlDB) findPublicKey(pubkeyID string) (*db.PublicKey, error) {
200 pk := &db.PublicKey{}
201 err := me.Db.Get(pk, `SELECT * FROM public_keys WHERE id = $1`, pubkeyID)
202 if err != nil {
203 return nil, err
204 }
205 return pk, nil
206}
207
208func (me *PsqlDB) FindKeysByUser(user *db.User) ([]*db.PublicKey, error) {
209 var keys []*db.PublicKey
210 err := me.Db.Select(&keys, `SELECT * FROM public_keys WHERE user_id = $1 ORDER BY created_at ASC`, user.ID)
211 if err != nil {
212 return nil, err
213 }
214 return keys, nil
215}
216
217func (me *PsqlDB) RemoveKeys(keyIDs []string) error {
218 param := "{" + strings.Join(keyIDs, ",") + "}"
219 _, err := me.Db.Exec(`DELETE FROM public_keys WHERE id = ANY($1::uuid[])`, param)
220 return err
221}
222
223func (me *PsqlDB) FindUsersWithPost(space string) ([]*db.User, error) {
224 var users []*db.User
225 rs, err := me.Db.Queryx(
226 `SELECT u.id, u.name, u.created_at
227 FROM app_users u
228 INNER JOIN posts ON u.id=posts.user_id
229 WHERE cur_space='feeds'
230 GROUP BY u.id, u.name, u.created_at
231 ORDER BY name ASC`,
232 )
233 if err != nil {
234 return users, err
235 }
236 defer func() { _ = rs.Close() }()
237 for rs.Next() {
238 var name sql.NullString
239 user := &db.User{}
240 err := rs.Scan(
241 &user.ID,
242 &name,
243 &user.CreatedAt,
244 )
245 if err != nil {
246 return users, err
247 }
248 user.Name = name.String
249
250 users = append(users, user)
251 }
252 if rs.Err() != nil {
253 return users, rs.Err()
254 }
255 return users, nil
256}
257
258func (me *PsqlDB) FindUserByKey(username string, key string) (*db.User, error) {
259 me.Logger.Info("attempting to find user with only public key", "key", key)
260 pk, err := me.findPublicKeyByKey(key)
261 if err == nil {
262 me.Logger.Info("found pubkey, looking for user", "key", key, "userId", pk.UserID)
263 user, err := me.FindUser(pk.UserID)
264 if err != nil {
265 return nil, err
266 }
267 user.PublicKey = pk
268 return user, nil
269 }
270
271 if errors.Is(err, &db.ErrMultiplePublicKeys{}) {
272 me.Logger.Info("detected multiple users with same public key", "user", username)
273 user, err := me.findUserForNameAndKey(username, key)
274 if err != nil {
275 me.Logger.Info("could not find user by username and public key", "user", username, "key", key)
276 // this is a little hacky but if we cannot find a user by name and public key
277 // then we return the multiple keys detected error so the user knows to specify their
278 // when logging in
279 return nil, &db.ErrMultiplePublicKeys{}
280 }
281 return user, nil
282 }
283
284 return nil, err
285}
286
287func (me *PsqlDB) FindUserByPubkey(key string) (*db.User, error) {
288 me.Logger.Info("attempting to find user with only public key", "key", key)
289 pk, err := me.findPublicKeyByKey(key)
290 if err != nil {
291 return nil, err
292 }
293
294 me.Logger.Info("found pubkey, looking for user", "key", key, "userId", pk.UserID)
295 user, err := me.FindUser(pk.UserID)
296 if err != nil {
297 return nil, err
298 }
299 user.PublicKey = pk
300 return user, nil
301}
302
303func (me *PsqlDB) FindUser(userID string) (*db.User, error) {
304 user := &db.User{}
305 err := me.Db.Get(user, `SELECT id, COALESCE(name, '') as name, created_at FROM app_users WHERE id = $1`, userID)
306 if err != nil {
307 return nil, err
308 }
309 return user, nil
310}
311
312func (me *PsqlDB) validateName(name string) (bool, error) {
313 lower := strings.ToLower(name)
314 if slices.Contains(db.DenyList, lower) {
315 return false, fmt.Errorf("%s is on deny list: %w", lower, db.ErrNameDenied)
316 }
317 v := db.NameValidator.MatchString(lower)
318 if !v {
319 return false, fmt.Errorf("%s is invalid: %w", lower, db.ErrNameInvalid)
320 }
321 user, _ := me.FindUserByName(lower)
322 if user == nil {
323 return true, nil
324 }
325 return false, fmt.Errorf("%s already taken: %w", lower, db.ErrNameTaken)
326}
327
328func (me *PsqlDB) FindUserByName(name string) (*db.User, error) {
329 user := &db.User{}
330 err := me.Db.Get(user, `SELECT * FROM app_users WHERE name = $1`, strings.ToLower(name))
331 if err != nil {
332 return nil, err
333 }
334 return user, nil
335}
336
337func (me *PsqlDB) findUserForNameAndKey(name string, key string) (*db.User, error) {
338 user := &db.User{}
339 pk := &db.PublicKey{}
340
341 r := me.Db.QueryRow(`SELECT app_users.id, app_users.name, app_users.created_at, public_keys.id as pk_id, public_keys.public_key, public_keys.created_at as pk_created_at FROM app_users LEFT JOIN public_keys ON public_keys.user_id = app_users.id WHERE app_users.name = $1 AND public_keys.public_key = $2`, strings.ToLower(name), key)
342 err := r.Scan(&user.ID, &user.Name, &user.CreatedAt, &pk.ID, &pk.Key, &pk.CreatedAt)
343 if err != nil {
344 return nil, err
345 }
346
347 user.PublicKey = pk
348 return user, nil
349}
350
351func (me *PsqlDB) FindUserByToken(token string) (*db.User, error) {
352 user := &db.User{}
353 err := me.Db.Get(user, `
354 SELECT app_users.id, app_users.name, app_users.created_at
355 FROM app_users
356 LEFT JOIN tokens ON tokens.user_id = app_users.id
357 WHERE tokens.token = $1 AND tokens.expires_at > NOW()`, token)
358 if err != nil {
359 return nil, err
360 }
361 return user, nil
362}
363
364func (me *PsqlDB) FindPostWithFilename(filename string, persona_id string, space string) (*db.Post, error) {
365 query := fmt.Sprintf(`
366 SELECT %s, STRING_AGG(coalesce(post_tags.name, ''), ',') tags
367 FROM posts
368 LEFT JOIN app_users ON app_users.id = posts.user_id
369 LEFT JOIN post_tags ON post_tags.post_id = posts.id
370 WHERE filename = $1 AND user_id = $2 AND cur_space = $3
371 GROUP BY %s`, SelectPost, SelectPost)
372 r := me.Db.QueryRow(query, filename, persona_id, space)
373 post, err := CreatePostWithTagsByRow(r)
374 if err != nil {
375 return nil, err
376 }
377
378 return post, nil
379}
380
381func (me *PsqlDB) FindPostWithSlug(slug string, user_id string, space string) (*db.Post, error) {
382 query := fmt.Sprintf(`
383 SELECT %s, STRING_AGG(coalesce(post_tags.name, ''), ',') tags
384 FROM posts
385 LEFT JOIN app_users ON app_users.id = posts.user_id
386 LEFT JOIN post_tags ON post_tags.post_id = posts.id
387 WHERE slug = $1 AND user_id = $2 AND cur_space = $3
388 GROUP BY %s`, SelectPost, SelectPost)
389 r := me.Db.QueryRow(query, slug, user_id, space)
390 post, err := CreatePostWithTagsByRow(r)
391 if err != nil {
392 // attempt to find post inside post_aliases
393 alias := me.Db.QueryRow(
394 `SELECT post_aliases.post_id FROM post_aliases
395 INNER JOIN posts ON posts.id = post_aliases.post_id
396 WHERE post_aliases.slug = $1 AND posts.user_id = $2`,
397 slug, user_id,
398 )
399 postID := ""
400 err := alias.Scan(&postID)
401 if err != nil {
402 return nil, err
403 }
404
405 return me.FindPost(postID)
406 }
407
408 return post, nil
409}
410
411func (me *PsqlDB) FindPost(postID string) (*db.Post, error) {
412 post := &db.Post{}
413 query := fmt.Sprintf(`
414 SELECT %s
415 FROM posts
416 LEFT JOIN app_users ON app_users.id = posts.user_id
417 WHERE posts.id = $1`, SelectPost)
418 err := me.Db.Get(post, query, postID)
419 if err != nil {
420 return nil, err
421 }
422 return post, nil
423}
424
425func (me *PsqlDB) postPager(rs *sqlx.Rows, pageNum int, space string, tag string) (*db.Paginate[*db.Post], error) {
426 var posts []*db.Post
427 for rs.Next() {
428 post := &db.Post{}
429 err := rs.Scan(
430 &post.ID,
431 &post.UserID,
432 &post.Filename,
433 &post.Slug,
434 &post.Title,
435 &post.Text,
436 &post.Description,
437 &post.PublishAt,
438 &post.Username,
439 &post.UpdatedAt,
440 &post.MimeType,
441 )
442 if err != nil {
443 return nil, err
444 }
445
446 posts = append(posts, post)
447 }
448 if rs.Err() != nil {
449 return nil, rs.Err()
450 }
451
452 var count int
453 var err error
454 if tag == "" {
455 err = me.Db.QueryRow(`SELECT count(id) FROM posts WHERE hidden = FALSE AND cur_space=$1`, space).Scan(&count)
456 } else {
457 err = me.Db.QueryRow(`
458 SELECT count(posts.id)
459 FROM posts
460 LEFT JOIN post_tags ON post_tags.post_id = posts.id
461 WHERE hidden = FALSE AND cur_space=$1 and post_tags.name = $2`, space, tag).Scan(&count)
462 }
463 if err != nil {
464 return nil, err
465 }
466
467 pager := &db.Paginate[*db.Post]{
468 Data: posts,
469 Total: int(math.Ceil(float64(count) / float64(pageNum))),
470 }
471
472 return pager, nil
473}
474
475func (me *PsqlDB) FindPostsByFeed(page *db.Pager, space string) (*db.Paginate[*db.Post], error) {
476 query := `
477 SELECT *
478 FROM (
479 SELECT DISTINCT ON (posts.user_id)
480 posts.id,
481 posts.user_id,
482 posts.filename,
483 posts.slug,
484 posts.title,
485 posts.text,
486 posts.description,
487 posts.publish_at,
488 app_users.name AS username,
489 posts.updated_at,
490 posts.mime_type
491 FROM posts
492 LEFT JOIN app_users ON app_users.id = posts.user_id
493 WHERE
494 hidden = FALSE
495 AND publish_at::date <= CURRENT_DATE
496 AND cur_space = $3
497 ORDER BY posts.user_id, publish_at DESC
498 ) AS latest_posts
499 ORDER BY publish_at DESC
500 LIMIT $1 OFFSET $2`
501 rs, err := me.Db.Queryx(query, page.Num, page.Num*page.Page, space)
502 if err != nil {
503 return nil, err
504 }
505 defer func() { _ = rs.Close() }()
506 return me.postPager(rs, page.Num, space, "")
507}
508
509func (me *PsqlDB) InsertPost(post *db.Post) (*db.Post, error) {
510 var id string
511 query := `
512 INSERT INTO posts
513 (user_id, filename, slug, title, text, description, publish_at, hidden, cur_space,
514 file_size, mime_type, shasum, data, expires_at, updated_at)
515 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15)
516 RETURNING id`
517 err := me.Db.QueryRow(
518 query,
519 post.UserID,
520 post.Filename,
521 post.Slug,
522 post.Title,
523 post.Text,
524 post.Description,
525 post.PublishAt,
526 post.Hidden,
527 post.Space,
528 post.FileSize,
529 post.MimeType,
530 post.Shasum,
531 post.Data,
532 post.ExpiresAt,
533 post.UpdatedAt,
534 ).Scan(&id)
535 if err != nil {
536 return nil, err
537 }
538
539 return me.FindPost(id)
540}
541
542func (me *PsqlDB) UpdatePost(post *db.Post) (*db.Post, error) {
543 query := `
544 UPDATE posts
545 SET slug = $1, title = $2, text = $3, description = $4, updated_at = $5, publish_at = $6,
546 file_size = $7, shasum = $8, data = $9, hidden = $11, expires_at = $12
547 WHERE id = $10`
548 _, err := me.Db.Exec(
549 query,
550 post.Slug,
551 post.Title,
552 post.Text,
553 post.Description,
554 post.UpdatedAt,
555 post.PublishAt,
556 post.FileSize,
557 post.Shasum,
558 post.Data,
559 post.ID,
560 post.Hidden,
561 post.ExpiresAt,
562 )
563 if err != nil {
564 return nil, err
565 }
566
567 return me.FindPost(post.ID)
568}
569
570func (me *PsqlDB) RemovePosts(postIDs []string) error {
571 param := "{" + strings.Join(postIDs, ",") + "}"
572 _, err := me.Db.Exec(`DELETE FROM posts WHERE id = ANY($1::uuid[])`, param)
573 return err
574}
575
576func (me *PsqlDB) FindPostsByUser(page *db.Pager, userID string, space string) (*db.Paginate[*db.Post], error) {
577 var posts []*db.Post
578 query := fmt.Sprintf(`
579 SELECT %s, STRING_AGG(coalesce(post_tags.name, ''), ',') tags
580 FROM posts
581 LEFT JOIN app_users ON app_users.id = posts.user_id
582 LEFT JOIN post_tags ON post_tags.post_id = posts.id
583 WHERE
584 hidden = FALSE AND
585 user_id = $1 AND
586 publish_at::date <= CURRENT_DATE AND
587 cur_space = $2
588 GROUP BY %s
589 ORDER BY publish_at DESC, slug DESC
590 LIMIT $3 OFFSET $4`, SelectPost, SelectPost)
591 rs, err := me.Db.Queryx(
592 query,
593 userID,
594 space,
595 page.Num,
596 page.Num*page.Page,
597 )
598 if err != nil {
599 return nil, err
600 }
601 defer func() { _ = rs.Close() }()
602 for rs.Next() {
603 post, err := CreatePostWithTagsByRow(rs)
604 if err != nil {
605 return nil, err
606 }
607
608 posts = append(posts, post)
609 }
610
611 if rs.Err() != nil {
612 return nil, rs.Err()
613 }
614
615 var count int
616 err = me.Db.QueryRow(`SELECT count(id) FROM posts WHERE hidden = FALSE AND cur_space=$1`, space).Scan(&count)
617 if err != nil {
618 return nil, err
619 }
620
621 pager := &db.Paginate[*db.Post]{
622 Data: posts,
623 Total: int(math.Ceil(float64(count) / float64(page.Num))),
624 }
625 return pager, nil
626}
627
628func (me *PsqlDB) FindAllPostsByUser(userID string, space string) ([]*db.Post, error) {
629 var posts []*db.Post
630 query := fmt.Sprintf(`
631 SELECT %s
632 FROM posts
633 LEFT JOIN app_users ON app_users.id = posts.user_id
634 WHERE
635 user_id = $1 AND
636 cur_space = $2
637 ORDER BY publish_at DESC`, SelectPost)
638 err := me.Db.Select(&posts, query, userID, space)
639 if err != nil {
640 return nil, err
641 }
642 return posts, nil
643}
644
645func (me *PsqlDB) FindPosts() ([]*db.Post, error) {
646 var posts []*db.Post
647 query := fmt.Sprintf(`
648 SELECT %s
649 FROM posts
650 LEFT JOIN app_users ON app_users.id = posts.user_id`, SelectPost)
651 err := me.Db.Select(&posts, query)
652 if err != nil {
653 return nil, err
654 }
655 return posts, nil
656}
657
658func (me *PsqlDB) FindExpiredPosts(space string) ([]*db.Post, error) {
659 var posts []*db.Post
660 query := fmt.Sprintf(`
661 SELECT %s
662 FROM posts
663 LEFT JOIN app_users ON app_users.id = posts.user_id
664 WHERE
665 cur_space = $1 AND
666 expires_at <= now();
667 `, SelectPost)
668 err := me.Db.Select(&posts, query, space)
669 if err != nil {
670 return nil, err
671 }
672 return posts, nil
673}
674
675func (me *PsqlDB) Close() error {
676 me.Logger.Info("Closing db")
677 return me.Db.Close()
678}
679
680func newNullString(s string) sql.NullString {
681 if len(s) == 0 {
682 return sql.NullString{}
683 }
684 return sql.NullString{
685 String: s,
686 Valid: true,
687 }
688}
689
690func (me *PsqlDB) InsertVisit(visit *db.AnalyticsVisits) error {
691 _, err := me.Db.Exec(
692 `INSERT INTO analytics_visits (user_id, project_id, post_id, namespace, host, path, ip_address, user_agent, referer, status, content_type) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11);`,
693 visit.UserID,
694 newNullString(visit.ProjectID),
695 newNullString(visit.PostID),
696 newNullString(visit.Namespace),
697 visit.Host,
698 visit.Path,
699 visit.IpAddress,
700 visit.UserAgent,
701 visit.Referer,
702 visit.Status,
703 visit.ContentType,
704 )
705 return err
706}
707
708func visitFilterBy(opts *db.SummaryOpts) (string, string) {
709 where := ""
710 val := ""
711 if opts.Host != "" {
712 where = "host"
713 val = opts.Host
714 } else if opts.Path != "" {
715 where = "path"
716 val = opts.Path
717 }
718
719 return where, val
720}
721
722func (me *PsqlDB) visitUnique(opts *db.SummaryOpts) ([]*db.VisitInterval, error) {
723 where, with := visitFilterBy(opts)
724 uniqueVisitors := fmt.Sprintf(`SELECT
725 date_trunc('%s', created_at) as interval_start,
726 count(DISTINCT ip_address) as unique_visitors
727 FROM analytics_visits
728 WHERE created_at >= $1 AND %s = $2 AND user_id = $3 AND status <> 404
729 GROUP BY interval_start`, opts.Interval, where)
730
731 intervals := []*db.VisitInterval{}
732 rs, err := me.Db.Queryx(uniqueVisitors, opts.Origin, with, opts.UserID)
733 if err != nil {
734 return nil, err
735 }
736 defer func() { _ = rs.Close() }()
737
738 for rs.Next() {
739 interval := &db.VisitInterval{}
740 err := rs.Scan(
741 &interval.Interval,
742 &interval.Visitors,
743 )
744 if err != nil {
745 return nil, err
746 }
747
748 intervals = append(intervals, interval)
749 }
750 if rs.Err() != nil {
751 return nil, rs.Err()
752 }
753 return intervals, nil
754}
755
756func (me *PsqlDB) visitReferer(opts *db.SummaryOpts) ([]*db.VisitUrl, error) {
757 where, with := visitFilterBy(opts)
758 topUrls := fmt.Sprintf(`SELECT
759 referer,
760 count(DISTINCT ip_address) as referer_count
761 FROM analytics_visits
762 WHERE created_at >= $1 AND %s = $2 AND user_id = $3 AND referer <> '' AND status <> 404
763 GROUP BY referer
764 ORDER BY referer_count DESC
765 LIMIT 10`, where)
766
767 intervals := []*db.VisitUrl{}
768 rs, err := me.Db.Queryx(topUrls, opts.Origin, with, opts.UserID)
769 if err != nil {
770 return nil, err
771 }
772 defer func() { _ = rs.Close() }()
773
774 for rs.Next() {
775 interval := &db.VisitUrl{}
776 err := rs.Scan(
777 &interval.Url,
778 &interval.Count,
779 )
780 if err != nil {
781 return nil, err
782 }
783
784 intervals = append(intervals, interval)
785 }
786 if rs.Err() != nil {
787 return nil, rs.Err()
788 }
789 return intervals, nil
790}
791
792func (me *PsqlDB) visitUrl(opts *db.SummaryOpts) ([]*db.VisitUrl, error) {
793 where, with := visitFilterBy(opts)
794 topUrls := fmt.Sprintf(`SELECT
795 path,
796 count(DISTINCT ip_address) as path_count
797 FROM analytics_visits
798 WHERE created_at >= $1 AND %s = $2 AND user_id = $3 AND path <> '' AND status <> 404
799 GROUP BY path
800 ORDER BY path_count DESC
801 LIMIT 10`, where)
802
803 intervals := []*db.VisitUrl{}
804 rs, err := me.Db.Queryx(topUrls, opts.Origin, with, opts.UserID)
805 if err != nil {
806 return nil, err
807 }
808 defer func() { _ = rs.Close() }()
809
810 for rs.Next() {
811 interval := &db.VisitUrl{}
812 err := rs.Scan(
813 &interval.Url,
814 &interval.Count,
815 )
816 if err != nil {
817 return nil, err
818 }
819
820 intervals = append(intervals, interval)
821 }
822 if rs.Err() != nil {
823 return nil, rs.Err()
824 }
825 return intervals, nil
826}
827
828func (me *PsqlDB) VisitUrlNotFound(opts *db.SummaryOpts) ([]*db.VisitUrl, error) {
829 limit := opts.Limit
830 if limit == 0 {
831 limit = 10
832 }
833 where, with := visitFilterBy(opts)
834 topUrls := fmt.Sprintf(`SELECT
835 path,
836 count(DISTINCT ip_address) as path_count
837 FROM analytics_visits
838 WHERE created_at >= $1 AND %s = $2 AND user_id = $3 AND path <> '' AND status = 404
839 GROUP BY path
840 ORDER BY path_count DESC
841 LIMIT %d`, where, limit)
842
843 intervals := []*db.VisitUrl{}
844 rs, err := me.Db.Queryx(topUrls, opts.Origin, with, opts.UserID)
845 if err != nil {
846 return nil, err
847 }
848 defer func() { _ = rs.Close() }()
849
850 for rs.Next() {
851 interval := &db.VisitUrl{}
852 err := rs.Scan(
853 &interval.Url,
854 &interval.Count,
855 )
856 if err != nil {
857 return nil, err
858 }
859
860 intervals = append(intervals, interval)
861 }
862 if rs.Err() != nil {
863 return nil, rs.Err()
864 }
865 return intervals, nil
866}
867
868func (me *PsqlDB) visitHost(opts *db.SummaryOpts) ([]*db.VisitUrl, error) {
869 topUrls := `SELECT
870 host,
871 count(DISTINCT ip_address) as host_count
872 FROM analytics_visits
873 WHERE user_id = $1 AND host <> ''
874 GROUP BY host
875 ORDER BY host_count DESC`
876
877 intervals := []*db.VisitUrl{}
878 rs, err := me.Db.Queryx(topUrls, opts.UserID)
879 if err != nil {
880 return nil, err
881 }
882 defer func() { _ = rs.Close() }()
883
884 for rs.Next() {
885 interval := &db.VisitUrl{}
886 err := rs.Scan(
887 &interval.Url,
888 &interval.Count,
889 )
890 if err != nil {
891 return nil, err
892 }
893
894 intervals = append(intervals, interval)
895 }
896 if rs.Err() != nil {
897 return nil, rs.Err()
898 }
899 return intervals, nil
900}
901
902func (me *PsqlDB) VisitSummary(opts *db.SummaryOpts) (*db.SummaryVisits, error) {
903 visitors, err := me.visitUnique(opts)
904 if err != nil {
905 return nil, err
906 }
907
908 urls, err := me.visitUrl(opts)
909 if err != nil {
910 return nil, err
911 }
912
913 notFound, err := me.VisitUrlNotFound(opts)
914 if err != nil {
915 return nil, err
916 }
917
918 refs, err := me.visitReferer(opts)
919 if err != nil {
920 return nil, err
921 }
922
923 return &db.SummaryVisits{
924 Intervals: visitors,
925 TopUrls: urls,
926 TopReferers: refs,
927 NotFoundUrls: notFound,
928 }, nil
929}
930
931func (me *PsqlDB) FindVisitSiteList(opts *db.SummaryOpts) ([]*db.VisitUrl, error) {
932 return me.visitHost(opts)
933}
934
935func (me *PsqlDB) FindUsers() ([]*db.User, error) {
936 var users []*db.User
937 err := me.Db.Select(&users, `SELECT id, COALESCE(name, '') as name, created_at FROM app_users ORDER BY name ASC`)
938 if err != nil {
939 return nil, err
940 }
941 return users, nil
942}
943
944func (me *PsqlDB) removeTagsByPost(tx *sqlx.Tx, postID string) error {
945 _, err := tx.Exec(`DELETE FROM post_tags WHERE post_id = $1`, postID)
946 return err
947}
948
949func (me *PsqlDB) insertTagsByPost(tx *sqlx.Tx, tags []string, postID string) ([]string, error) {
950 ids := make([]string, 0)
951 for _, tag := range tags {
952 id := ""
953 err := tx.QueryRow(`INSERT INTO post_tags (post_id, name) VALUES($1, $2) RETURNING id;`, postID, tag).Scan(&id)
954 if err != nil {
955 return nil, err
956 }
957 ids = append(ids, id)
958 }
959
960 return ids, nil
961}
962
963func (me *PsqlDB) ReplaceTagsByPost(tags []string, postID string) error {
964 tx, err := me.Db.Beginx()
965 if err != nil {
966 return err
967 }
968 defer func() {
969 _ = tx.Rollback()
970 }()
971
972 err = me.removeTagsByPost(tx, postID)
973 if err != nil {
974 return err
975 }
976
977 _, err = me.insertTagsByPost(tx, tags, postID)
978 if err != nil {
979 return err
980 }
981
982 err = tx.Commit()
983 return err
984}
985
986func (me *PsqlDB) removeAliasesByPost(tx *sqlx.Tx, postID string) error {
987 _, err := tx.Exec(`DELETE FROM post_aliases WHERE post_id = $1`, postID)
988 return err
989}
990
991func (me *PsqlDB) insertAliasesByPost(tx *sqlx.Tx, aliases []string, postID string) ([]string, error) {
992 // hardcoded
993 denyList := []string{
994 "rss",
995 "rss.xml",
996 "rss.atom",
997 "atom.xml",
998 "feed.xml",
999 "smol.css",
1000 "main.css",
1001 "syntax.css",
1002 "card.png",
1003 "favicon-16x16.png",
1004 "favicon-32x32.png",
1005 "apple-touch-icon.png",
1006 "favicon.ico",
1007 "robots.txt",
1008 "atom",
1009 "blog/index.xml",
1010 }
1011
1012 ids := make([]string, 0)
1013 for _, alias := range aliases {
1014 if slices.Contains(denyList, alias) {
1015 me.Logger.Info(
1016 "name is in the deny list for aliases because it conflicts with a static route, skipping",
1017 "alias", alias,
1018 )
1019 continue
1020 }
1021 id := ""
1022 err := tx.QueryRow(`INSERT INTO post_aliases (post_id, slug) VALUES($1, $2) RETURNING id;`, postID, alias).Scan(&id)
1023 if err != nil {
1024 return nil, err
1025 }
1026 ids = append(ids, id)
1027 }
1028
1029 return ids, nil
1030}
1031
1032func (me *PsqlDB) ReplaceAliasesByPost(aliases []string, postID string) error {
1033 tx, err := me.Db.Beginx()
1034 if err != nil {
1035 return err
1036 }
1037 defer func() {
1038 _ = tx.Rollback()
1039 }()
1040
1041 err = me.removeAliasesByPost(tx, postID)
1042 if err != nil {
1043 return err
1044 }
1045
1046 _, err = me.insertAliasesByPost(tx, aliases, postID)
1047 if err != nil {
1048 return err
1049 }
1050
1051 err = tx.Commit()
1052 return err
1053}
1054
1055func (me *PsqlDB) FindUserPostsByTag(page *db.Pager, tag, userID, space string) (*db.Paginate[*db.Post], error) {
1056 var posts []*db.Post
1057 query := fmt.Sprintf(`
1058 SELECT %s
1059 FROM posts
1060 LEFT JOIN app_users ON app_users.id = posts.user_id
1061 LEFT JOIN post_tags ON post_tags.post_id = posts.id
1062 WHERE
1063 hidden = FALSE AND
1064 user_id = $1 AND
1065 (post_tags.name = $2 OR hidden = true) AND
1066 publish_at::date <= CURRENT_DATE AND
1067 cur_space = $3
1068 ORDER BY publish_at DESC
1069 LIMIT $4 OFFSET $5`, SelectPost)
1070 err := me.Db.Select(
1071 &posts,
1072 query,
1073 userID,
1074 tag,
1075 space,
1076 page.Num,
1077 page.Num*page.Page,
1078 )
1079 if err != nil {
1080 return nil, err
1081 }
1082
1083 var count int
1084 err = me.Db.QueryRow(`SELECT count(id) FROM posts WHERE hidden = FALSE AND cur_space=$1`, space).Scan(&count)
1085 if err != nil {
1086 return nil, err
1087 }
1088
1089 pager := &db.Paginate[*db.Post]{
1090 Data: posts,
1091 Total: int(math.Ceil(float64(count) / float64(page.Num))),
1092 }
1093 return pager, nil
1094}
1095
1096func (me *PsqlDB) FindPostsByTag(pager *db.Pager, tag, space string) (*db.Paginate[*db.Post], error) {
1097 query := `
1098 SELECT
1099 posts.id,
1100 user_id,
1101 filename,
1102 slug,
1103 title,
1104 text,
1105 description,
1106 publish_at,
1107 app_users.name as username,
1108 posts.updated_at,
1109 posts.mime_type
1110 FROM posts
1111 LEFT JOIN app_users ON app_users.id = posts.user_id
1112 LEFT JOIN post_tags ON post_tags.post_id = posts.id
1113 WHERE
1114 post_tags.name = $3 AND
1115 publish_at::date <= CURRENT_DATE AND
1116 cur_space = $4
1117 ORDER BY publish_at DESC
1118 LIMIT $1 OFFSET $2`
1119 rs, err := me.Db.Queryx(
1120 query,
1121 pager.Num,
1122 pager.Num*pager.Page,
1123 tag,
1124 space,
1125 )
1126 if err != nil {
1127 return nil, err
1128 }
1129 defer func() { _ = rs.Close() }()
1130
1131 return me.postPager(rs, pager.Num, space, tag)
1132}
1133
1134func (me *PsqlDB) FindPopularTags(space string) ([]string, error) {
1135 tags := make([]string, 0)
1136 query := `
1137 SELECT name, count(post_id) as "tally"
1138 FROM post_tags
1139 LEFT JOIN posts ON posts.id = post_id
1140 WHERE posts.cur_space = $1
1141 GROUP BY name
1142 ORDER BY tally DESC
1143 LIMIT 5`
1144 rs, err := me.Db.Queryx(query, space)
1145 if err != nil {
1146 return tags, err
1147 }
1148 defer func() { _ = rs.Close() }()
1149 for rs.Next() {
1150 name := ""
1151 tally := 0
1152 err := rs.Scan(&name, &tally)
1153 if err != nil {
1154 return tags, err
1155 }
1156
1157 tags = append(tags, name)
1158 }
1159 if rs.Err() != nil {
1160 return tags, rs.Err()
1161 }
1162 return tags, nil
1163}
1164
1165func (me *PsqlDB) FindFeature(userID string, feature string) (*db.FeatureFlag, error) {
1166 ff := &db.FeatureFlag{}
1167 err := me.Db.Get(ff, `SELECT * FROM feature_flags WHERE user_id = $1 AND name = $2 ORDER BY expires_at DESC LIMIT 1`, userID, feature)
1168 if err != nil {
1169 return nil, err
1170 }
1171 return ff, nil
1172}
1173
1174func (me *PsqlDB) FindFeaturesByUser(userID string) ([]*db.FeatureFlag, error) {
1175 var features []*db.FeatureFlag
1176 // https://stackoverflow.com/a/16920077
1177 query := `SELECT DISTINCT ON (name) *
1178 FROM feature_flags
1179 WHERE user_id=$1
1180 ORDER BY name, expires_at DESC;`
1181 err := me.Db.Select(&features, query, userID)
1182 if err != nil {
1183 return nil, err
1184 }
1185 return features, nil
1186}
1187
1188func (me *PsqlDB) HasFeatureByUser(userID string, feature string) bool {
1189 ff, err := me.FindFeature(userID, feature)
1190 if err != nil {
1191 return false
1192 }
1193 return ff.IsValid()
1194}
1195
1196func (me *PsqlDB) InsertFeedItems(postID string, items []*db.FeedItem) error {
1197 tx, err := me.Db.Beginx()
1198 if err != nil {
1199 return err
1200 }
1201 defer func() {
1202 _ = tx.Rollback()
1203 }()
1204
1205 for _, item := range items {
1206 _, err := tx.Exec(
1207 `INSERT INTO feed_items (post_id, guid, data) VALUES ($1, $2, $3) RETURNING id;`,
1208 item.PostID,
1209 item.GUID,
1210 item.Data,
1211 )
1212 if err != nil {
1213 return fmt.Errorf(
1214 "post id:%s, link:%s, guid:%s, err:%w",
1215 item.PostID, item.Data.Link, item.GUID, err,
1216 )
1217 }
1218 }
1219
1220 err = tx.Commit()
1221 return err
1222}
1223
1224func (me *PsqlDB) FindFeedItemsByPostID(postID string) ([]*db.FeedItem, error) {
1225 var items []*db.FeedItem
1226 err := me.Db.Select(&items, `SELECT * FROM feed_items WHERE post_id=$1`, postID)
1227 if err != nil {
1228 return nil, err
1229 }
1230 return items, nil
1231}
1232
1233func (me *PsqlDB) InsertProject(userID, name, projectDir string) (string, error) {
1234 if !shared.IsValidSubdomain(name) {
1235 return "", fmt.Errorf("'%s' is not a valid project name, must match /^[a-z0-9-]+$/", name)
1236 }
1237
1238 var id string
1239 err := me.Db.QueryRow(`INSERT INTO projects (user_id, name, project_dir) VALUES ($1, $2, $3) RETURNING id;`, userID, name, projectDir).Scan(&id)
1240 if err != nil {
1241 return "", err
1242 }
1243 return id, nil
1244}
1245
1246func (me *PsqlDB) UpdateProject(userID, name string) error {
1247 _, err := me.Db.Exec(`UPDATE projects SET updated_at = $3 WHERE user_id = $1 AND name = $2;`, userID, name, time.Now())
1248 return err
1249}
1250
1251func (me *PsqlDB) FindProjectByName(userID, name string) (*db.Project, error) {
1252 project := &db.Project{}
1253 err := me.Db.Get(project, `SELECT * FROM projects WHERE user_id = $1 AND name = $2;`, userID, name)
1254 if err != nil {
1255 return nil, err
1256 }
1257 return project, nil
1258}
1259
1260func (me *PsqlDB) InsertToken(userID, name string) (string, error) {
1261 var token string
1262 err := me.Db.QueryRow(`INSERT INTO tokens (user_id, name) VALUES($1, $2) RETURNING token;`, userID, name).Scan(&token)
1263 if err != nil {
1264 return "", err
1265 }
1266 return token, nil
1267}
1268
1269func (me *PsqlDB) UpsertToken(userID, name string) (string, error) {
1270 token, _ := me.findTokenByName(userID, name)
1271 if token != "" {
1272 return token, nil
1273 }
1274
1275 token, err := me.InsertToken(userID, name)
1276 return token, err
1277}
1278
1279func (me *PsqlDB) findTokenByName(userID, name string) (string, error) {
1280 var token string
1281 err := me.Db.QueryRow(`SELECT token FROM tokens WHERE user_id = $1 AND name = $2`, userID, name).Scan(&token)
1282 if err != nil {
1283 return "", err
1284 }
1285 return token, nil
1286}
1287
1288func (me *PsqlDB) RemoveToken(tokenID string) error {
1289 _, err := me.Db.Exec(`DELETE FROM tokens WHERE id = $1`, tokenID)
1290 return err
1291}
1292
1293func (me *PsqlDB) FindTokensByUser(userID string) ([]*db.Token, error) {
1294 var tokens []*db.Token
1295 err := me.Db.Select(&tokens, `SELECT * FROM tokens WHERE user_id = $1`, userID)
1296 if err != nil {
1297 return nil, err
1298 }
1299 return tokens, nil
1300}
1301
1302func (me *PsqlDB) InsertFeature(userID, name string, expiresAt time.Time) (*db.FeatureFlag, error) {
1303 var featureID string
1304 err := me.Db.QueryRow(
1305 `INSERT INTO feature_flags (user_id, name, expires_at) VALUES ($1, $2, $3) RETURNING id;`,
1306 userID,
1307 name,
1308 expiresAt,
1309 ).Scan(&featureID)
1310 if err != nil {
1311 return nil, err
1312 }
1313
1314 feature, err := me.FindFeature(userID, name)
1315 if err != nil {
1316 return nil, err
1317 }
1318
1319 return feature, nil
1320}
1321
1322func (me *PsqlDB) RemoveFeature(userID string, name string) error {
1323 _, err := me.Db.Exec(`DELETE FROM feature_flags WHERE user_id = $1 AND name = $2`, userID, name)
1324 return err
1325}
1326
1327func (me *PsqlDB) createFeatureExpiresAt(userID, name string) time.Time {
1328 ff, _ := me.FindFeature(userID, name)
1329 // if the feature flag has already expired we don't want to add a year to it since that will
1330 // not grant the user a full year
1331 if ff == nil || !ff.IsValid() {
1332 t := time.Now()
1333 return t.AddDate(1, 0, 0)
1334 }
1335 return ff.ExpiresAt.AddDate(1, 0, 0)
1336}
1337
1338func (me *PsqlDB) AddPicoPlusUser(username, email, paymentType, txId string) error {
1339 user, err := me.FindUserByName(username)
1340 if err != nil {
1341 return err
1342 }
1343
1344 tx, err := me.Db.Beginx()
1345 if err != nil {
1346 return err
1347 }
1348 defer func() {
1349 _ = tx.Rollback()
1350 }()
1351
1352 var paymentHistoryId sql.NullString
1353 if paymentType != "" {
1354 data := db.PaymentHistoryData{
1355 Notes: "",
1356 TxID: txId,
1357 }
1358
1359 err := tx.QueryRow(
1360 `INSERT INTO payment_history (user_id, payment_type, amount, data) VALUES ($1, $2, 24 * 1000000, $3) RETURNING id;`,
1361 user.ID,
1362 paymentType,
1363 data,
1364 ).Scan(&paymentHistoryId)
1365 if err != nil {
1366 return err
1367 }
1368 }
1369
1370 plus := me.createFeatureExpiresAt(user.ID, "plus")
1371 plusQuery := fmt.Sprintf(`INSERT INTO feature_flags (user_id, name, data, expires_at, payment_history_id)
1372 VALUES ($1, 'plus', '{"storage_max":10000000000, "file_max":50000000, "email": "%s"}'::jsonb, $2, $3);`, email)
1373 _, err = tx.Exec(plusQuery, user.ID, plus, paymentHistoryId)
1374 if err != nil {
1375 return err
1376 }
1377
1378 return tx.Commit()
1379}
1380
1381func (me *PsqlDB) UpsertProject(userID, projectName, projectDir string) (*db.Project, error) {
1382 project, err := me.FindProjectByName(userID, projectName)
1383 if err == nil {
1384 // this just updates the `createdAt` timestamp, useful for book-keeping
1385 err = me.UpdateProject(userID, projectName)
1386 if err != nil {
1387 me.Logger.Error("could not update project", "err", err)
1388 return nil, err
1389 }
1390 return project, nil
1391 }
1392
1393 _, err = me.InsertProject(userID, projectName, projectName)
1394 if err != nil {
1395 me.Logger.Error("could not create project", "err", err)
1396 return nil, err
1397 }
1398 return me.FindProjectByName(userID, projectName)
1399}
1400
1401func (me *PsqlDB) findPagesStats(userID string) (*db.UserServiceStats, error) {
1402 stats := db.UserServiceStats{
1403 Service: "pgs",
1404 }
1405 err := me.Db.QueryRow(
1406 `SELECT count(id), min(created_at), max(created_at), max(updated_at) FROM projects WHERE user_id=$1`,
1407 userID,
1408 ).Scan(&stats.Num, &stats.FirstCreatedAt, &stats.LastestCreatedAt, &stats.LatestUpdatedAt)
1409 if err != nil {
1410 return nil, err
1411 }
1412
1413 return &stats, nil
1414}
1415
1416func (me *PsqlDB) InsertTunsEventLog(log *db.TunsEventLog) error {
1417 _, err := me.Db.Exec(
1418 `INSERT INTO tuns_event_logs
1419 (user_id, server_id, remote_addr, event_type, tunnel_type, connection_type, tunnel_id)
1420 VALUES
1421 ($1, $2, $3, $4, $5, $6, $7)`,
1422 log.UserId, log.ServerID, log.RemoteAddr, log.EventType, log.TunnelType,
1423 log.ConnectionType, log.TunnelID,
1424 )
1425 return err
1426}
1427
1428func (me *PsqlDB) FindTunsEventLogsByAddr(userID, addr string) ([]*db.TunsEventLog, error) {
1429 var logs []*db.TunsEventLog
1430 err := me.Db.Select(&logs,
1431 `SELECT * FROM tuns_event_logs WHERE user_id=$1 AND tunnel_id=$2 ORDER BY created_at DESC`, userID, addr)
1432 if err != nil {
1433 return nil, err
1434 }
1435 return logs, nil
1436}
1437
1438func (me *PsqlDB) FindTunsEventLogs(userID string) ([]*db.TunsEventLog, error) {
1439 var logs []*db.TunsEventLog
1440 err := me.Db.Select(&logs,
1441 `SELECT * FROM tuns_event_logs WHERE user_id=$1 ORDER BY created_at DESC`, userID)
1442 if err != nil {
1443 return nil, err
1444 }
1445 return logs, nil
1446}
1447
1448func (me *PsqlDB) FindUserStats(userID string) (*db.UserStats, error) {
1449 stats := db.UserStats{}
1450 rs, err := me.Db.Queryx(`SELECT cur_space, count(id), min(created_at), max(created_at), max(updated_at) FROM posts WHERE user_id=$1 GROUP BY cur_space`, userID)
1451 if err != nil {
1452 return nil, err
1453 }
1454 defer func() { _ = rs.Close() }()
1455
1456 for rs.Next() {
1457 stat := db.UserServiceStats{}
1458 err := rs.Scan(&stat.Service, &stat.Num, &stat.FirstCreatedAt, &stat.LastestCreatedAt, &stat.LatestUpdatedAt)
1459 if err != nil {
1460 return nil, err
1461 }
1462 switch stat.Service {
1463 case "prose":
1464 stats.Prose = stat
1465 case "pastes":
1466 stats.Pastes = stat
1467 case "feeds":
1468 stats.Feeds = stat
1469 }
1470 }
1471
1472 if rs.Err() != nil {
1473 return nil, rs.Err()
1474 }
1475
1476 pgs, err := me.findPagesStats(userID)
1477 if err != nil {
1478 return nil, err
1479 }
1480 stats.Pages = *pgs
1481 return &stats, err
1482}
1483
1484func (me *PsqlDB) FindAccessLogs(userID string, fromDate *time.Time) ([]*db.AccessLog, error) {
1485 var logs []*db.AccessLog
1486 err := me.Db.Select(&logs, `SELECT * FROM access_logs WHERE user_id=$1 AND created_at >= $2 ORDER BY created_at ASC`, userID, fromDate)
1487 if err != nil {
1488 return nil, err
1489 }
1490 return logs, nil
1491}
1492
1493func (me *PsqlDB) FindAccessLogsByPubkey(pubkey string, fromDate *time.Time) ([]*db.AccessLog, error) {
1494 var logs []*db.AccessLog
1495 err := me.Db.Select(&logs, `SELECT * FROM access_logs WHERE pubkey=$1 AND created_at >= $2 ORDER BY created_at ASC`, pubkey, fromDate)
1496 if err != nil {
1497 return nil, err
1498 }
1499 return logs, nil
1500}
1501
1502func (me *PsqlDB) FindPubkeysInAccessLogs(userID string) ([]string, error) {
1503 var pubkeys []string
1504 err := me.Db.Select(&pubkeys, `SELECT DISTINCT(pubkey) FROM access_logs WHERE user_id=$1`, userID)
1505 if err != nil {
1506 return nil, err
1507 }
1508 return pubkeys, nil
1509}
1510
1511func (me *PsqlDB) InsertAccessLog(log *db.AccessLog) error {
1512 _, err := me.Db.Exec(
1513 `INSERT INTO access_logs (user_id, service, pubkey, identity) VALUES ($1, $2, $3, $4);`,
1514 log.UserID,
1515 log.Service,
1516 log.Pubkey,
1517 log.Identity,
1518 )
1519 return err
1520}
1521
1522func (me *PsqlDB) UpsertPipeMonitor(userID, topic string, dur time.Duration, winEnd *time.Time) error {
1523 durStr := fmt.Sprintf("%d seconds", int64(dur.Seconds()))
1524 _, err := me.Db.Exec(
1525 `INSERT INTO pipe_monitors (user_id, topic, window_dur, window_end)
1526 VALUES ($1, $2, $3::interval, $4)
1527 ON CONFLICT (user_id, topic) DO UPDATE SET window_dur = $3::interval, window_end = $4, updated_at = NOW();`,
1528 userID,
1529 topic,
1530 durStr,
1531 winEnd,
1532 )
1533 return err
1534}
1535
1536func (me *PsqlDB) UpdatePipeMonitorLastPing(userID, topic string, lastPing *time.Time) error {
1537 _, err := me.Db.Exec(
1538 `UPDATE pipe_monitors SET last_ping = $3, updated_at = NOW() WHERE user_id = $1 AND topic = $2;`,
1539 userID,
1540 topic,
1541 lastPing,
1542 )
1543 return err
1544}
1545
1546func (me *PsqlDB) RemovePipeMonitor(userID, topic string) error {
1547 _, err := me.Db.Exec(
1548 `DELETE FROM pipe_monitors WHERE user_id = $1 AND topic = $2;`,
1549 userID,
1550 topic,
1551 )
1552 return err
1553}
1554
1555func (me *PsqlDB) FindPipeMonitorByTopic(userID, topic string) (*db.PipeMonitor, error) {
1556 monitor := &db.PipeMonitor{}
1557 err := me.Db.Get(monitor, `SELECT id, user_id, topic, (EXTRACT(EPOCH FROM window_dur) * 1000000000)::bigint as window_dur, window_end, last_ping, created_at, updated_at FROM pipe_monitors WHERE user_id = $1 AND topic = $2;`, userID, topic)
1558 if err != nil {
1559 return nil, err
1560 }
1561 return monitor, nil
1562}
1563
1564func (me *PsqlDB) FindPipeMonitorsByUser(userID string) ([]*db.PipeMonitor, error) {
1565 var monitors []*db.PipeMonitor
1566 err := me.Db.Select(&monitors, `SELECT id, user_id, topic, (EXTRACT(EPOCH FROM window_dur) * 1000000000)::bigint as window_dur, window_end, last_ping, created_at, updated_at FROM pipe_monitors WHERE user_id = $1 ORDER BY topic;`, userID)
1567 if err != nil {
1568 return nil, err
1569 }
1570 return monitors, nil
1571}
1572
1573func (me *PsqlDB) InsertPipeMonitorHistory(monitorID string, windowDur time.Duration, windowEnd, lastPing *time.Time) error {
1574 durStr := fmt.Sprintf("%d seconds", int64(windowDur.Seconds()))
1575 _, err := me.Db.Exec(
1576 `INSERT INTO pipe_monitors_history (monitor_id, window_dur, window_end, last_ping) VALUES ($1, $2::interval, $3, $4)`,
1577 monitorID, durStr, windowEnd, lastPing,
1578 )
1579 return err
1580}
1581
1582func (me *PsqlDB) FindPipeMonitorHistory(monitorID string, from, to time.Time) ([]*db.PipeMonitorHistory, error) {
1583 var history []*db.PipeMonitorHistory
1584 err := me.Db.Select(
1585 &history,
1586 `SELECT id, monitor_id, (EXTRACT(EPOCH FROM window_dur) * 1000000000)::bigint as window_dur, window_end, last_ping, created_at, updated_at FROM pipe_monitors_history WHERE monitor_id = $1 AND last_ping <= $2 AND window_end >= $3 ORDER BY last_ping ASC`,
1587 monitorID, to, from,
1588 )
1589 if err != nil {
1590 return nil, err
1591 }
1592 return history, nil
1593}