ultisuite-backend/internal/migration/import_helpers.go
R3D347HR4Y 951c88b1ca
Some checks are pending
CI / Go tests (push) Waiting to run
CI / Integration tests (push) Waiting to run
CI / DB migrations (push) Waiting to run
feat(migration): graph childFolders, parent FK, B2B hardening
- Graph mail: discover nested childFolders, merge new folders into
  cached graphFolderQueue without breaking in-progress cursors
- Add mail_folders.parent_id (migration 000050) and wire hierarchy on import
- Shared drives: skip discovery on delta ticks, guard merge by project
- Provision: remove platform-domain email rewrite on claim
- Integration tests for nested folders, parent_id, delta childFolders mocks
2026-06-13 13:16:36 +02:00

217 lines
5.3 KiB
Go

package migration
import (
"context"
"fmt"
"io"
"net/http"
"strings"
"time"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/ultisuite/ulti-backend/internal/mail/credentials"
)
type progressUpdater func(status string, cursor, stats map[string]any, jobErr string) error
type migrationUser struct {
Email string
ExternalID string
Name string
}
func resolveMigrationUser(ctx context.Context, db *pgxpool.Pool, userID string) (migrationUser, error) {
var u migrationUser
err := db.QueryRow(ctx, `
SELECT COALESCE(email, ''), COALESCE(external_id, ''), COALESCE(name, '')
FROM users WHERE id = $1::uuid
`, userID).Scan(&u.Email, &u.ExternalID, &u.Name)
if err != nil {
return migrationUser{}, fmt.Errorf("migration user not found")
}
if u.Email == "" {
return migrationUser{}, fmt.Errorf("migration user email missing")
}
return u, nil
}
func migrationHTTPClient() *http.Client {
return &http.Client{Timeout: 90 * time.Second}
}
func apiGet(ctx context.Context, client *http.Client, url, accessToken string) ([]byte, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return nil, err
}
req.Header.Set("Authorization", "Bearer "+accessToken)
resp, err := migrationDo(ctx, client, req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
return io.ReadAll(resp.Body)
}
func alreadyImported(store *ImportedItemStore, id string) bool {
if store == nil {
return false
}
return store.Has(id)
}
func calendarSyncTokens(cursor map[string]any) map[string]string {
raw, _ := cursor["calendarSyncTokens"].(map[string]any)
out := make(map[string]string, len(raw))
for k, v := range raw {
if s, ok := v.(string); ok && s != "" {
out[k] = s
}
}
return out
}
func setCalendarSyncToken(cursor map[string]any, calID, token string) {
if calID == "" || token == "" {
return
}
raw, _ := cursor["calendarSyncTokens"].(map[string]any)
if raw == nil {
raw = map[string]any{}
cursor["calendarSyncTokens"] = raw
}
raw[calID] = token
}
func calendarDeltaLinks(cursor map[string]any) map[string]string {
raw, _ := cursor["calendarDeltaLinks"].(map[string]any)
out := make(map[string]string, len(raw))
for k, v := range raw {
if s, ok := v.(string); ok && s != "" {
out[k] = s
}
}
return out
}
func setCalendarDeltaLink(cursor map[string]any, calID, link string) {
if calID == "" || link == "" {
return
}
raw, _ := cursor["calendarDeltaLinks"].(map[string]any)
if raw == nil {
raw = map[string]any{}
cursor["calendarDeltaLinks"] = raw
}
raw[calID] = link
}
func graphFolderDeltaLinks(cursor map[string]any) map[string]string {
raw, _ := cursor["folderDeltaLinks"].(map[string]any)
out := make(map[string]string, len(raw))
for k, v := range raw {
if s, ok := v.(string); ok && s != "" {
out[k] = s
}
}
return out
}
func setGraphFolderDeltaLink(cursor map[string]any, folderID, link string) {
if folderID == "" || link == "" {
return
}
raw, _ := cursor["folderDeltaLinks"].(map[string]any)
if raw == nil {
raw = map[string]any{}
cursor["folderDeltaLinks"] = raw
}
raw[folderID] = link
}
func readGraphFolderQueue(cursor map[string]any) []string {
raw, _ := cursor["graphFolderQueue"].([]any)
out := make([]string, 0, len(raw))
for _, v := range raw {
if s, ok := v.(string); ok && s != "" {
out = append(out, s)
}
}
return out
}
func writeGraphFolderQueue(cursor map[string]any, ids []string) {
queue := make([]any, 0, len(ids))
for _, id := range ids {
if id != "" {
queue = append(queue, id)
}
}
cursor["graphFolderQueue"] = queue
}
// mergeGraphFolderQueue extends a cached import queue with newly discovered folder
// IDs while preserving order for folders already in progress.
func mergeGraphFolderQueue(cursor map[string]any, discovered []string) []string {
existing := readGraphFolderQueue(cursor)
if len(existing) == 0 {
writeGraphFolderQueue(cursor, discovered)
return discovered
}
seen := make(map[string]struct{}, len(existing))
for _, id := range existing {
seen[id] = struct{}{}
}
merged := make([]string, len(existing), len(existing)+len(discovered))
copy(merged, existing)
for _, id := range discovered {
if id == "" {
continue
}
if _, ok := seen[id]; ok {
continue
}
merged = append(merged, id)
seen[id] = struct{}{}
}
if len(merged) != len(existing) {
writeGraphFolderQueue(cursor, merged)
}
return merged
}
func migrationContactPath(bookPath, provider, sourceID string) string {
uid := sanitizeMigrationUID(provider, sourceID)
return bookPath + uid + ".vcf"
}
func migrationEventPath(calPath, provider, sourceID string) string {
uid := sanitizeMigrationUID(provider, sourceID)
return calPath + uid + ".ics"
}
func sanitizeMigrationUID(provider, sourceID string) string {
sourceID = strings.TrimSpace(sourceID)
sourceID = strings.ReplaceAll(sourceID, "/", "-")
return provider + "-" + sourceID + "@ultimail.migrated"
}
func applyOAuthToken(cred credentials.Credential, token *oauthToken) credentials.Credential {
cred.AuthType = credentials.AuthOAuth2
cred.AccessToken = token.AccessToken
if token.RefreshToken != "" {
cred.RefreshToken = token.RefreshToken
}
if !token.Expiry.IsZero() {
cred.Expiry = token.Expiry.UTC()
}
return cred
}
type oauthToken struct {
AccessToken string
RefreshToken string
Expiry time.Time
}