repos / pico

pico services mono repo
git clone https://github.com/picosh/pico.git

pico / cmd / scripts / migrate
Eric Bower  ·  2025-12-17

migrate.go

  1package main
  2
  3import (
  4	"context"
  5	"database/sql"
  6	"fmt"
  7	"log/slog"
  8	"os"
  9
 10	"github.com/jmoiron/sqlx"
 11	"github.com/picosh/pico/pkg/db"
 12	"github.com/picosh/pico/pkg/db/postgres"
 13	"github.com/picosh/pico/pkg/shared"
 14)
 15
 16func findPosts(dbpool *sqlx.DB) ([]*db.Post, error) {
 17	var posts []*db.Post
 18	rs, err := dbpool.Query(`SELECT
 19		posts.id, user_id, filename, title, text, description,
 20		posts.created_at, publish_at, posts.updated_at, hidden, COALESCE(views, 0) as views
 21		FROM posts
 22		LEFT OUTER JOIN post_analytics ON post_analytics.post_id = posts.id
 23	`)
 24	if err != nil {
 25		return posts, err
 26	}
 27	for rs.Next() {
 28		post := &db.Post{}
 29		err := rs.Scan(
 30			&post.ID,
 31			&post.UserID,
 32			&post.Filename,
 33			&post.Title,
 34			&post.Text,
 35			&post.Description,
 36			&post.CreatedAt,
 37			&post.PublishAt,
 38			&post.UpdatedAt,
 39			&post.Hidden,
 40			&post.Views,
 41		)
 42		if err != nil {
 43			return posts, err
 44		}
 45
 46		posts = append(posts, post)
 47	}
 48	if rs.Err() != nil {
 49		return posts, rs.Err()
 50	}
 51	return posts, nil
 52}
 53
 54func insertUser(tx *sql.Tx, user *db.User) error {
 55	_, err := tx.Exec(
 56		"INSERT INTO app_users (id, name, created_at) VALUES($1, $2, $3)",
 57		user.ID,
 58		user.Name,
 59		user.CreatedAt,
 60	)
 61	return err
 62}
 63
 64func insertPublicKey(tx *sql.Tx, pk *db.PublicKey) error {
 65	_, err := tx.Exec(
 66		"INSERT INTO public_keys (id, user_id, public_key, created_at) VALUES ($1, $2, $3, $4)",
 67		pk.ID,
 68		pk.UserID,
 69		pk.Key,
 70		pk.CreatedAt,
 71	)
 72	return err
 73}
 74
 75func insertPost(tx *sql.Tx, post *db.Post) error {
 76	_, err := tx.Exec(
 77		`INSERT INTO posts
 78			(id, user_id, title, text, created_at, publish_at, updated_at, description, filename, hidden, cur_space, views)
 79			VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)`,
 80		post.ID,
 81		post.UserID,
 82		post.Title,
 83		post.Text,
 84		post.CreatedAt,
 85		post.PublishAt,
 86		post.UpdatedAt,
 87		post.Description,
 88		post.Filename,
 89		post.Hidden,
 90		post.Space,
 91		post.Views,
 92	)
 93	return err
 94}
 95
 96type ConflictData struct {
 97	User          *db.User
 98	Pks           []*db.PublicKey
 99	ReplaceWithID string
100}
101
102func main() {
103	logger := slog.Default()
104
105	listsCfg := shared.NewConfigSite()
106	listsCfg.Logger = logger
107	listsCfg.DbURL = os.Getenv("LISTS_DB_URL")
108	listsDb := postgres.NewDB(listsCfg.DbURL, listsCfg.Logger)
109
110	proseCfg := shared.NewConfigSite()
111	proseCfg.DbURL = os.Getenv("PROSE_DB_URL")
112	proseCfg.Logger = logger
113	proseDb := postgres.NewDB(proseCfg.DbURL, proseCfg.Logger)
114
115	picoCfg := shared.NewConfigSite()
116	picoCfg.Logger = logger
117	picoCfg.DbURL = os.Getenv("PICO_DB_URL")
118	picoDb := postgres.NewDB(picoCfg.DbURL, picoCfg.Logger)
119
120	ctx := context.Background()
121	tx, err := picoDb.Db.BeginTx(ctx, nil)
122	if err != nil {
123		panic(err)
124	}
125	defer func() {
126		err = tx.Rollback()
127	}()
128
129	logger.Info("Finding prose users")
130	proseUsers, err := proseDb.FindUsers()
131	if err != nil {
132		panic(err)
133	}
134
135	logger.Info("Finding lists users")
136	listUsers, err := listsDb.FindUsers()
137	if err != nil {
138		panic(err)
139	}
140
141	logger.Info("Adding prose users and public keys to PICO db")
142	userMap := map[string]*db.User{}
143	for _, proseUser := range proseUsers {
144		userMap[proseUser.Name] = proseUser
145
146		err = insertUser(tx, proseUser)
147		if err != nil {
148			panic(err)
149		}
150
151		proseKeys, err := proseDb.FindKeysByUser(proseUser)
152		if err != nil {
153			panic(err)
154		}
155
156		for _, prosePK := range proseKeys {
157			err = insertPublicKey(tx, prosePK)
158			if err != nil {
159				panic(err)
160			}
161		}
162	}
163
164	noconflicts := []*ConflictData{}
165	conflicts := []*ConflictData{}
166	updateIDs := []*ConflictData{}
167	logger.Info("Finding conflicts")
168	for _, listUser := range listUsers {
169		listKeys, err := listsDb.FindKeysByUser(listUser)
170		if err != nil {
171			panic(err)
172		}
173
174		data := &ConflictData{
175			User: listUser,
176			Pks:  listKeys,
177		}
178
179		if userMap[listUser.Name] == nil {
180			noconflicts = append(noconflicts, data)
181			continue
182		} else {
183			proseUser := userMap[listUser.Name]
184			proseKeys, err := proseDb.FindKeysByUser(proseUser)
185			if err != nil {
186				panic(err)
187			}
188
189			if len(listKeys) != len(proseKeys) {
190				conflicts = append(conflicts, data)
191				continue
192			}
193
194			pkMap := map[string]bool{}
195			for _, prosePK := range proseKeys {
196				pkMap[prosePK.Key] = true
197			}
198
199			conflicted := false
200			for _, listPK := range listKeys {
201				if !pkMap[listPK.Key] {
202					conflicted = true
203					conflicts = append(conflicts, data)
204					break
205				}
206			}
207
208			if !conflicted {
209				data.ReplaceWithID = proseUser.ID
210				updateIDs = append(updateIDs, data)
211			}
212		}
213	}
214
215	logger.Info("adding records with no conflicts", "len", len(noconflicts))
216	for _, data := range noconflicts {
217		err = insertUser(tx, data.User)
218		if err != nil {
219			panic(err)
220		}
221
222		for _, pk := range data.Pks {
223			err = insertPublicKey(tx, pk)
224			if err != nil {
225				panic(err)
226			}
227		}
228	}
229
230	logger.Info("adding records with conflicts", "len", len(conflicts))
231	for _, data := range conflicts {
232		data.User.Name = fmt.Sprintf("%stmp", data.User.Name)
233		err = insertUser(tx, data.User)
234		if err != nil {
235			panic(err)
236		}
237
238		for _, pk := range data.Pks {
239			err = insertPublicKey(tx, pk)
240			if err != nil {
241				panic(err)
242			}
243		}
244	}
245
246	prosePosts, err := findPosts(proseDb.Db)
247	if err != nil {
248		panic(err)
249	}
250
251	logger.Info("Adding posts from prose.sh")
252	for _, post := range prosePosts {
253		post.Space = "prose"
254		err = insertPost(tx, post)
255		if err != nil {
256			panic(err)
257		}
258	}
259
260	listPosts, err := findPosts(listsDb.Db)
261	if err != nil {
262		panic(err)
263	}
264
265	logger.Info("Adding posts from lists.sh")
266	for _, post := range listPosts {
267		updated := false
268		for _, alreadyAdded := range updateIDs {
269			if post.UserID == alreadyAdded.User.ID {
270				// we need to change the ID for these posts to the prose user id
271				// because we were able to determine it was the same user
272				post.UserID = alreadyAdded.ReplaceWithID
273				post.Space = "lists"
274				err = insertPost(tx, post)
275				if err != nil {
276					panic(err)
277				}
278				updated = true
279				break
280			}
281		}
282
283		if updated {
284			continue
285		}
286
287		post.Space = "lists"
288		err = insertPost(tx, post)
289		if err != nil {
290			panic(err)
291		}
292	}
293
294	logger.Info("Committing transactions to PICO db")
295	// Commit the transaction.
296	if err = tx.Commit(); err != nil {
297		panic(err)
298	}
299}