ultisuite-backend/internal/migration/worker.go
R3D347HR4Y 7143a36c19
Some checks are pending
CI / Go tests (push) Waiting to run
CI / Integration tests (push) Waiting to run
CI / DB migrations (push) Waiting to run
feat(mail): integrate Stalwart hosted mail and migration features
- Added configuration options for Stalwart hosted mail in .env.example.
- Updated Docker Compose to include Stalwart service with health checks.
- Introduced new API endpoints for managing mail domains and migration projects.
- Enhanced Authentik blueprints for user enrollment and post-migration security.
- Updated OAuth handling for Google and Microsoft migration processes.
- Improved error handling and response structures in the mail API.
- Added integration tests for email claiming and migration workflows.
2026-06-13 12:47:08 +02:00

286 lines
8.4 KiB
Go

package migration
import (
"context"
"fmt"
"log/slog"
"net/http"
"strings"
"sync"
"time"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/ultisuite/ulti-backend/internal/mail/credentials"
mailstorage "github.com/ultisuite/ulti-backend/internal/mail/storage"
"github.com/ultisuite/ulti-backend/internal/nextcloud"
"github.com/ultisuite/ulti-backend/internal/observability"
)
type Worker struct {
db *pgxpool.Pool
svc *Service
oauth *OAuthService
creds *credentials.Manager
googleDWD *GoogleDWD
microsoftApp *MicrosoftApp
nc *nextcloud.Client
storage *mailstorage.Client
attachBucket string
concurrency int
jobLimit int
logger *slog.Logger
client *http.Client
}
// WorkerConfig tunes migration worker parallelism and job pickup.
type WorkerConfig struct {
Concurrency int
JobLimit int
}
func NewWorker(db *pgxpool.Pool, svc *Service, oauth *OAuthService, creds *credentials.Manager, googleDWD *GoogleDWD, microsoftApp *MicrosoftApp, nc *nextcloud.Client, storage *mailstorage.Client, attachBucket string, cfg WorkerConfig) *Worker {
concurrency := cfg.Concurrency
if concurrency <= 0 {
concurrency = 1
}
jobLimit := cfg.JobLimit
if jobLimit <= 0 {
jobLimit = concurrency * 3
if jobLimit < 5 {
jobLimit = 5
}
}
return &Worker{
db: db,
svc: svc,
oauth: oauth,
creds: creds,
googleDWD: googleDWD,
microsoftApp: microsoftApp,
nc: nc,
storage: storage,
attachBucket: attachBucket,
concurrency: concurrency,
jobLimit: jobLimit,
logger: slog.Default().With("component", "migration-worker"),
client: &http.Client{Timeout: 60 * time.Second},
}
}
func (w *Worker) Start(ctx context.Context, interval time.Duration) {
if interval <= 0 {
interval = 30 * time.Second
}
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
w.tick(ctx)
}
}
}
func (w *Worker) tick(ctx context.Context) {
jobs, err := w.svc.PendingJobs(ctx, w.jobLimit)
if err != nil {
w.logger.Error("list pending migration jobs", "error", err)
return
}
observability.SetMigrationPendingJobs(len(jobs))
if len(jobs) == 0 {
return
}
sem := make(chan struct{}, w.concurrency)
var wg sync.WaitGroup
for _, job := range jobs {
wg.Add(1)
sem <- struct{}{}
go func(job Job) {
defer wg.Done()
defer func() { <-sem }()
if _, err := w.processJob(ctx, job); err != nil {
w.logger.Error("migration job failed", "job_id", job.ID, "service", job.Service, "error", err)
}
}(job)
}
wg.Wait()
}
func (w *Worker) processJob(ctx context.Context, job Job) (string, error) {
start := time.Now()
outcome := "unknown"
defer func() {
observability.ObserveMigrationJob(job.Service, outcome, time.Since(start))
}()
if err := w.svc.UpdateJobProgress(ctx, job.ID, "running", job.CursorJSON, job.StatsJSON, ""); err != nil {
outcome = "failed"
return outcome, err
}
var provider string
var delta bool
var authMode string
err := w.db.QueryRow(ctx, `
SELECT p.source_provider, p.delta_mode, p.auth_mode
FROM migration_projects p WHERE p.id = $1::uuid
`, job.ProjectID).Scan(&provider, &delta, &authMode)
if err != nil {
outcome = "failed"
return outcome, err
}
accessToken, graphUserUPN, err := w.loadAccessToken(ctx, job, provider, authMode)
if err != nil {
outcome = "failed"
_ = w.svc.UpdateJobProgress(ctx, job.ID, "failed", job.CursorJSON, job.StatsJSON, err.Error())
return outcome, err
}
var lastStatus string
update := func(status string, cursor, stats map[string]any, jobErr string) error {
lastStatus = status
return w.svc.UpdateJobProgress(ctx, job.ID, status, cursor, stats, jobErr)
}
var procErr error
var selfManaged bool
switch job.Service {
case "mail":
selfManaged = true
if provider == "google" {
procErr = NewGmailImporter(w.db).WithStorage(w.storage, w.attachBucket).ImportBatch(ctx, &job, accessToken, delta, update)
} else {
procErr = NewGraphImporter(w.db).WithUserPrincipal(graphUserUPN).ImportBatch(ctx, &job, accessToken, delta, update)
}
case "contacts":
selfManaged = true
procErr = NewContactsImporter(w.db, w.nc).WithUserPrincipal(graphUserUPN).ImportBatch(ctx, &job, accessToken, provider, delta, update)
case "calendar":
selfManaged = true
procErr = NewCalendarImporter(w.db, w.nc).WithUserPrincipal(graphUserUPN).ImportBatch(ctx, &job, accessToken, provider, delta, update)
case "drive":
selfManaged = true
procErr = NewDriveImporter(w.db, w.nc).WithUserPrincipal(graphUserUPN).ImportBatch(ctx, &job, accessToken, provider, delta, update)
default:
procErr = fmt.Errorf("unknown service %q", job.Service)
}
if procErr != nil {
if IsRateLimitError(procErr) {
if job.StatsJSON == nil {
job.StatsJSON = map[string]any{}
}
job.StatsJSON["rate_limited"] = true
job.StatsJSON["rate_limit_at"] = time.Now().UTC().Format(time.RFC3339)
outcome = "rate_limited"
return outcome, w.svc.UpdateJobProgress(ctx, job.ID, "pending", job.CursorJSON, job.StatsJSON, procErr.Error())
}
outcome = "failed"
return outcome, w.svc.UpdateJobProgress(ctx, job.ID, "failed", job.CursorJSON, job.StatsJSON, procErr.Error())
}
if selfManaged {
switch lastStatus {
case "completed":
outcome = "completed"
case "pending":
outcome = "pending"
default:
outcome = "completed"
}
return outcome, nil
}
outcome = "completed"
return outcome, w.svc.UpdateJobProgress(ctx, job.ID, "completed", job.CursorJSON, job.StatsJSON, "")
}
func (w *Worker) loadAccessToken(ctx context.Context, job Job, provider, authMode string) (accessToken, graphUserUPN string, err error) {
if provider == "google" && authMode == AuthModeGoogleDWD {
if w.googleDWD == nil || !w.googleDWD.Enabled() {
return "", "", fmt.Errorf("google domain-wide delegation not configured")
}
email, err := w.inviteEmail(ctx, job.ProjectID, job.UserID)
if err != nil {
return "", "", err
}
token, err := w.googleDWD.AccessToken(ctx, email)
return token, "", err
}
if provider == "microsoft" && authMode == AuthModeMicrosoftApp {
if w.microsoftApp == nil || !w.microsoftApp.Enabled() {
return "", "", fmt.Errorf("microsoft app-only auth not configured")
}
tenantID, err := w.projectMicrosoftTenant(ctx, job.ProjectID)
if err != nil {
return "", "", err
}
email, err := w.inviteEmail(ctx, job.ProjectID, job.UserID)
if err != nil {
return "", "", err
}
token, err := w.microsoftApp.AccessToken(ctx, tenantID)
return token, email, err
}
cred, err := w.loadToken(ctx, job.UserID, job.ProjectID, provider)
if err != nil {
return "", "", err
}
if w.oauth != nil && cred.NeedsRefresh() {
cred, err = w.svc.RefreshCredential(ctx, w.oauth, job.UserID, job.ProjectID, provider, cred)
if err != nil {
return "", "", err
}
}
return cred.AccessToken, "", nil
}
func (w *Worker) projectMicrosoftTenant(ctx context.Context, projectID string) (string, error) {
var tenantID string
err := w.db.QueryRow(ctx, `
SELECT COALESCE(NULLIF(microsoft_tenant_id, ''), '')
FROM migration_projects WHERE id = $1::uuid
`, projectID).Scan(&tenantID)
if err != nil {
return "", fmt.Errorf("migration project tenant lookup: %w", err)
}
if strings.TrimSpace(tenantID) == "" {
return "", fmt.Errorf("microsoft tenant id missing: complete admin consent first")
}
return tenantID, nil
}
func (w *Worker) inviteEmail(ctx context.Context, projectID, userID string) (string, error) {
var email string
err := w.db.QueryRow(ctx, `
SELECT email FROM migration_invites
WHERE project_id = $1::uuid AND user_id = $2::uuid
ORDER BY claimed_at DESC NULLS LAST LIMIT 1
`, projectID, userID).Scan(&email)
if err != nil {
return "", fmt.Errorf("migration invite email missing for domain-wide delegation")
}
return email, nil
}
func (w *Worker) loadToken(ctx context.Context, userID, projectID, provider string) (credentials.Credential, error) {
var blob []byte
err := w.db.QueryRow(ctx, `
SELECT encrypted_token FROM migration_credentials
WHERE user_id = $1::uuid AND project_id = $2::uuid AND provider = $3 AND revoked_at IS NULL
`, userID, projectID, provider).Scan(&blob)
if err != nil {
return credentials.Credential{}, fmt.Errorf("migration credentials missing: run OAuth consent first")
}
cred, err := w.creds.DecryptCredential(blob)
if err != nil {
return credentials.Credential{}, err
}
cred.AuthType = credentials.AuthOAuth2
cred.OAuthProvider = provider
return cred, nil
}