package migration import ( "context" "fmt" "log/slog" "net/http" "strings" "sync" "time" "github.com/jackc/pgx/v5/pgxpool" "github.com/ultisuite/ulti-backend/internal/mail/credentials" mailstorage "github.com/ultisuite/ulti-backend/internal/mail/storage" "github.com/ultisuite/ulti-backend/internal/nextcloud" "github.com/ultisuite/ulti-backend/internal/observability" ) type Worker struct { db *pgxpool.Pool svc *Service oauth *OAuthService creds *credentials.Manager googleDWD *GoogleDWD microsoftApp *MicrosoftApp nc *nextcloud.Client storage *mailstorage.Client attachBucket string concurrency int jobLimit int logger *slog.Logger client *http.Client } // WorkerConfig tunes migration worker parallelism and job pickup. type WorkerConfig struct { Concurrency int JobLimit int } func NewWorker(db *pgxpool.Pool, svc *Service, oauth *OAuthService, creds *credentials.Manager, googleDWD *GoogleDWD, microsoftApp *MicrosoftApp, nc *nextcloud.Client, storage *mailstorage.Client, attachBucket string, cfg WorkerConfig) *Worker { concurrency := cfg.Concurrency if concurrency <= 0 { concurrency = 1 } jobLimit := cfg.JobLimit if jobLimit <= 0 { jobLimit = concurrency * 3 if jobLimit < 5 { jobLimit = 5 } } return &Worker{ db: db, svc: svc, oauth: oauth, creds: creds, googleDWD: googleDWD, microsoftApp: microsoftApp, nc: nc, storage: storage, attachBucket: attachBucket, concurrency: concurrency, jobLimit: jobLimit, logger: slog.Default().With("component", "migration-worker"), client: &http.Client{Timeout: 60 * time.Second}, } } func (w *Worker) Start(ctx context.Context, interval time.Duration) { if interval <= 0 { interval = 30 * time.Second } ticker := time.NewTicker(interval) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: w.tick(ctx) } } } func (w *Worker) tick(ctx context.Context) { jobs, err := w.svc.PendingJobs(ctx, w.jobLimit) if err != nil { w.logger.Error("list pending migration jobs", "error", err) return } observability.SetMigrationPendingJobs(len(jobs)) if len(jobs) == 0 { return } sem := make(chan struct{}, w.concurrency) var wg sync.WaitGroup for _, job := range jobs { wg.Add(1) sem <- struct{}{} go func(job Job) { defer wg.Done() defer func() { <-sem }() if _, err := w.processJob(ctx, job); err != nil { w.logger.Error("migration job failed", "job_id", job.ID, "service", job.Service, "error", err) } }(job) } wg.Wait() } func (w *Worker) processJob(ctx context.Context, job Job) (string, error) { start := time.Now() outcome := "unknown" defer func() { observability.ObserveMigrationJob(job.Service, outcome, time.Since(start)) }() if err := w.svc.UpdateJobProgress(ctx, job.ID, "running", job.CursorJSON, job.StatsJSON, ""); err != nil { outcome = "failed" return outcome, err } var provider string var delta bool var authMode string err := w.db.QueryRow(ctx, ` SELECT p.source_provider, p.delta_mode, p.auth_mode FROM migration_projects p WHERE p.id = $1::uuid `, job.ProjectID).Scan(&provider, &delta, &authMode) if err != nil { outcome = "failed" return outcome, err } accessToken, graphUserUPN, err := w.loadAccessToken(ctx, job, provider, authMode) if err != nil { outcome = "failed" _ = w.svc.UpdateJobProgress(ctx, job.ID, "failed", job.CursorJSON, job.StatsJSON, err.Error()) return outcome, err } var lastStatus string update := func(status string, cursor, stats map[string]any, jobErr string) error { lastStatus = status return w.svc.UpdateJobProgress(ctx, job.ID, status, cursor, stats, jobErr) } var procErr error var selfManaged bool switch job.Service { case "mail": selfManaged = true if provider == "google" { procErr = NewGmailImporter(w.db).WithStorage(w.storage, w.attachBucket).ImportBatch(ctx, &job, accessToken, delta, update) } else { procErr = NewGraphImporter(w.db).WithUserPrincipal(graphUserUPN).ImportBatch(ctx, &job, accessToken, delta, update) } case "contacts": selfManaged = true procErr = NewContactsImporter(w.db, w.nc).WithUserPrincipal(graphUserUPN).ImportBatch(ctx, &job, accessToken, provider, delta, update) case "calendar": selfManaged = true procErr = NewCalendarImporter(w.db, w.nc).WithUserPrincipal(graphUserUPN).ImportBatch(ctx, &job, accessToken, provider, delta, update) case "drive": selfManaged = true sharedMode, err := w.projectSharedDriveMode(ctx, job.ProjectID) if err != nil { outcome = "failed" return outcome, err } dedup, err := LoadSharedDriveItemStore(ctx, w.db, job.ProjectID) if err != nil { outcome = "failed" return outcome, err } procErr = NewDriveImporter(w.db, w.nc). WithUserPrincipal(graphUserUPN). WithProject(job.ProjectID, sharedMode, dedup). ImportBatch(ctx, &job, accessToken, provider, delta, update) default: procErr = fmt.Errorf("unknown service %q", job.Service) } if procErr != nil { if IsRateLimitError(procErr) { if job.StatsJSON == nil { job.StatsJSON = map[string]any{} } job.StatsJSON["rate_limited"] = true job.StatsJSON["rate_limit_at"] = time.Now().UTC().Format(time.RFC3339) outcome = "rate_limited" return outcome, w.svc.UpdateJobProgress(ctx, job.ID, "pending", job.CursorJSON, job.StatsJSON, procErr.Error()) } outcome = "failed" return outcome, w.svc.UpdateJobProgress(ctx, job.ID, "failed", job.CursorJSON, job.StatsJSON, procErr.Error()) } if selfManaged { switch lastStatus { case "completed": outcome = "completed" case "pending": outcome = "pending" default: outcome = "completed" } return outcome, nil } outcome = "completed" return outcome, w.svc.UpdateJobProgress(ctx, job.ID, "completed", job.CursorJSON, job.StatsJSON, "") } func (w *Worker) loadAccessToken(ctx context.Context, job Job, provider, authMode string) (accessToken, graphUserUPN string, err error) { if provider == "google" && authMode == AuthModeGoogleDWD { if w.googleDWD == nil || !w.googleDWD.Enabled() { return "", "", fmt.Errorf("google domain-wide delegation not configured") } email, err := w.inviteEmail(ctx, job.ProjectID, job.UserID) if err != nil { return "", "", err } token, err := w.googleDWD.AccessToken(ctx, email) return token, "", err } if provider == "microsoft" && authMode == AuthModeMicrosoftApp { if w.microsoftApp == nil || !w.microsoftApp.Enabled() { return "", "", fmt.Errorf("microsoft app-only auth not configured") } tenantID, err := w.projectMicrosoftTenant(ctx, job.ProjectID) if err != nil { return "", "", err } email, err := w.inviteEmail(ctx, job.ProjectID, job.UserID) if err != nil { return "", "", err } token, err := w.microsoftApp.AccessToken(ctx, tenantID) return token, email, err } cred, err := w.loadToken(ctx, job.UserID, job.ProjectID, provider) if err != nil { return "", "", err } if w.oauth != nil && cred.NeedsRefresh() { cred, err = w.svc.RefreshCredential(ctx, w.oauth, job.UserID, job.ProjectID, provider, cred) if err != nil { return "", "", err } } return cred.AccessToken, "", nil } func (w *Worker) projectMicrosoftTenant(ctx context.Context, projectID string) (string, error) { var tenantID string err := w.db.QueryRow(ctx, ` SELECT COALESCE(NULLIF(microsoft_tenant_id, ''), '') FROM migration_projects WHERE id = $1::uuid `, projectID).Scan(&tenantID) if err != nil { return "", fmt.Errorf("migration project tenant lookup: %w", err) } if strings.TrimSpace(tenantID) == "" { return "", fmt.Errorf("microsoft tenant id missing: complete admin consent first") } return tenantID, nil } func (w *Worker) projectSharedDriveMode(ctx context.Context, projectID string) (string, error) { var mode string err := w.db.QueryRow(ctx, ` SELECT COALESCE(shared_drive_mode, 'auto') FROM migration_projects WHERE id = $1::uuid `, projectID).Scan(&mode) return NormalizeSharedDriveMode(mode), err } func (w *Worker) inviteEmail(ctx context.Context, projectID, userID string) (string, error) { var email string err := w.db.QueryRow(ctx, ` SELECT email FROM migration_invites WHERE project_id = $1::uuid AND user_id = $2::uuid ORDER BY claimed_at DESC NULLS LAST LIMIT 1 `, projectID, userID).Scan(&email) if err != nil { return "", fmt.Errorf("migration invite email missing for domain-wide delegation") } return email, nil } func (w *Worker) loadToken(ctx context.Context, userID, projectID, provider string) (credentials.Credential, error) { var blob []byte err := w.db.QueryRow(ctx, ` SELECT encrypted_token FROM migration_credentials WHERE user_id = $1::uuid AND project_id = $2::uuid AND provider = $3 AND revoked_at IS NULL `, userID, projectID, provider).Scan(&blob) if err != nil { return credentials.Credential{}, fmt.Errorf("migration credentials missing: run OAuth consent first") } cred, err := w.creds.DecryptCredential(blob) if err != nil { return credentials.Credential{}, err } cred.AuthType = credentials.AuthOAuth2 cred.OAuthProvider = provider return cred, nil }