package migration import ( "context" "fmt" "strings" "github.com/jackc/pgx/v5/pgxpool" ) // ImportedItemStore tracks imported source IDs and optional relative paths for a migration job. // Data lives in migration_imported_items instead of unbounded cursor_json maps. type ImportedItemStore struct { db *pgxpool.Pool jobID string done map[string]struct{} // imported or skipped — resume skips these paths map[string]string } func NewImportedItemStoreMemory() *ImportedItemStore { return &ImportedItemStore{ done: map[string]struct{}{}, paths: map[string]string{}, } } func LoadImportedItemStore(ctx context.Context, db *pgxpool.Pool, jobID string, cursor map[string]any) (*ImportedItemStore, error) { store := &ImportedItemStore{ db: db, jobID: jobID, done: map[string]struct{}{}, paths: map[string]string{}, } if db != nil && jobID != "" { rows, err := db.Query(ctx, ` SELECT source_id, rel_path, status FROM migration_imported_items WHERE job_id = $1::uuid `, jobID) if err != nil { return nil, err } defer rows.Close() for rows.Next() { var sourceID, relPath, status string if err := rows.Scan(&sourceID, &relPath, &status); err != nil { return nil, err } if isImportedItemDone(status) { store.done[sourceID] = struct{}{} } if relPath != "" { store.paths[sourceID] = relPath } } if err := rows.Err(); err != nil { return nil, err } } if err := store.migrateLegacyCursor(ctx, cursor); err != nil { return nil, err } stripImportedCursorKeys(cursor) return store, nil } func isImportedItemDone(status string) bool { switch status { case "", ItemStatusImported, ItemStatusSkipped: return true default: return false } } func stripImportedCursorKeys(cursor map[string]any) { if cursor == nil { return } delete(cursor, "imported_ids") delete(cursor, "imported_paths") } func (s *ImportedItemStore) Has(id string) bool { _, ok := s.done[id] return ok } func (s *ImportedItemStore) Path(id string) string { return s.paths[id] } func (s *ImportedItemStore) MarkImported(ctx context.Context, id string) error { return s.upsertItem(ctx, id, "", ItemStatusImported, "") } func (s *ImportedItemStore) MarkPath(ctx context.Context, id, relPath string) error { return s.upsertItem(ctx, id, relPath, ItemStatusImported, "") } func (s *ImportedItemStore) MarkSkipped(ctx context.Context, id, reason, relPath string) error { return s.upsertItem(ctx, id, relPath, ItemStatusSkipped, reason) } func (s *ImportedItemStore) MarkFailed(ctx context.Context, id, reason, relPath string) error { delete(s.done, id) delete(s.paths, id) if s.db == nil || s.jobID == "" || id == "" { return nil } _, err := s.db.Exec(ctx, ` INSERT INTO migration_imported_items (job_id, source_id, rel_path, status, reason) VALUES ($1::uuid, $2, $3, $4, $5) ON CONFLICT (job_id, source_id) DO UPDATE SET rel_path = EXCLUDED.rel_path, status = EXCLUDED.status, reason = EXCLUDED.reason, imported_at = NOW() `, s.jobID, id, relPath, ItemStatusFailed, truncateReason(reason)) return err } func (s *ImportedItemStore) upsertItem(ctx context.Context, id, relPath, status, reason string) error { if id == "" { return nil } if isImportedItemDone(status) { s.done[id] = struct{}{} } else { delete(s.done, id) } if relPath != "" { s.paths[id] = relPath } if s.db == nil || s.jobID == "" { return nil } _, err := s.db.Exec(ctx, ` INSERT INTO migration_imported_items (job_id, source_id, rel_path, status, reason) VALUES ($1::uuid, $2, $3, $4, $5) ON CONFLICT (job_id, source_id) DO UPDATE SET rel_path = EXCLUDED.rel_path, status = EXCLUDED.status, reason = EXCLUDED.reason, imported_at = NOW() `, s.jobID, id, relPath, status, truncateReason(reason)) return err } func truncateReason(reason string) string { reason = strings.TrimSpace(reason) const maxLen = 2000 if len(reason) <= maxLen { return reason } return reason[:maxLen] } func (s *ImportedItemStore) Unmark(ctx context.Context, id string) error { if id == "" { return nil } delete(s.done, id) delete(s.paths, id) if s.db == nil || s.jobID == "" { return nil } _, err := s.db.Exec(ctx, ` DELETE FROM migration_imported_items WHERE job_id = $1::uuid AND source_id = $2 `, s.jobID, id) return err } func (s *ImportedItemStore) migrateLegacyCursor(ctx context.Context, cursor map[string]any) error { if cursor == nil { return nil } rawIDs, _ := cursor["imported_ids"].(map[string]any) rawPaths, _ := cursor["imported_paths"].(map[string]any) if len(rawIDs) == 0 && len(rawPaths) == 0 { return nil } seen := map[string]struct{}{} for id := range rawIDs { seen[id] = struct{}{} } for id := range rawPaths { seen[id] = struct{}{} } for id := range seen { relPath, _ := rawPaths[id].(string) if relPath != "" { if err := s.MarkPath(ctx, id, relPath); err != nil { return fmt.Errorf("migrate imported path %q: %w", id, err) } continue } if err := s.MarkImported(ctx, id); err != nil { return fmt.Errorf("migrate imported id %q: %w", id, err) } } return nil }