package imap import ( "context" "errors" "fmt" "log/slog" "strings" "time" "github.com/emersion/go-imap/v2" "github.com/emersion/go-imap/v2/imapclient" "github.com/jackc/pgx/v5/pgxpool" "github.com/ultisuite/ulti-backend/internal/mail/credentials" "github.com/ultisuite/ulti-backend/internal/observability" ) type SyncWorker struct { db *pgxpool.Pool logger *slog.Logger interval time.Duration credentials *credentials.Manager } func NewSyncWorker(db *pgxpool.Pool, interval time.Duration, credManager *credentials.Manager) *SyncWorker { return &SyncWorker{ db: db, logger: slog.Default().With("component", "imap-sync"), interval: interval, credentials: credManager, } } func (w *SyncWorker) Start(ctx context.Context) { ticker := time.NewTicker(w.interval) defer ticker.Stop() w.logger.Info("imap sync worker started", "interval", w.interval) // Initial sync w.runSyncCycle(ctx) for { select { case <-ctx.Done(): w.logger.Info("imap sync worker stopped") return case <-ticker.C: w.runSyncCycle(ctx) } } } func (w *SyncWorker) runSyncCycle(ctx context.Context) { start := time.Now() if err := w.syncAllAccounts(ctx); err != nil { observability.ObserveIMAPSync("error", time.Since(start)) w.logger.Error("sync cycle failed", "error", err) return } observability.ObserveIMAPSync("success", time.Since(start)) } func (w *SyncWorker) syncAllAccounts(ctx context.Context) error { rows, err := w.db.Query(ctx, ` SELECT id, imap_host, imap_port, imap_tls, credentials, sync_state FROM mail_accounts WHERE is_active = true `) if err != nil { w.logger.Error("failed to query accounts", "error", err) return err } defer rows.Close() hasSyncError := false for rows.Next() { var ( accountID string host string port int useTLS bool creds []byte syncState []byte ) if err := rows.Scan(&accountID, &host, &port, &useTLS, &creds, &syncState); err != nil { w.logger.Error("failed to scan account", "error", err) hasSyncError = true continue } if err := w.syncAccount(ctx, accountID, host, port, useTLS, creds, syncState); err != nil { w.logger.Error("sync failed", "account_id", accountID, "error", err) hasSyncError = true } } if err := rows.Err(); err != nil { return err } if hasSyncError { return errors.New("one or more account sync failed") } return nil } func (w *SyncWorker) syncAccount(ctx context.Context, accountID, host string, port int, useTLS bool, creds, syncState []byte) error { addr := fmt.Sprintf("%s:%d", host, port) var client *imapclient.Client var err error opts := &imapclient.Options{} if useTLS { client, err = imapclient.DialTLS(addr, opts) } else { client, err = imapclient.DialStartTLS(addr, opts) } if err != nil { return fmt.Errorf("dial: %w", err) } defer client.Close() username, password, err := w.parseCredentials(creds) if err != nil { return fmt.Errorf("decrypt credentials: %w", err) } if err := client.Login(username, password).Wait(); err != nil { return fmt.Errorf("login: %w", err) } // List mailboxes mailboxes, err := client.List("", "*", nil).Collect() if err != nil { return fmt.Errorf("list: %w", err) } for _, mbox := range mailboxes { if err := w.syncFolder(ctx, client, accountID, mbox.Mailbox); err != nil { w.logger.Error("folder sync failed", "account_id", accountID, "folder", mbox.Mailbox, "error", err) } } // Update last sync time _, err = w.db.Exec(ctx, `UPDATE mail_accounts SET last_sync_at = NOW() WHERE id = $1`, accountID) return err } func (w *SyncWorker) syncFolder(ctx context.Context, client *imapclient.Client, accountID, folderName string) error { selectData, err := client.Select(folderName, nil).Wait() if err != nil { return fmt.Errorf("select %s: %w", folderName, err) } // Upsert folder record var folderID string err = w.db.QueryRow(ctx, ` INSERT INTO mail_folders (account_id, name, remote_name, uidvalidity, message_count) VALUES ($1, $2, $2, $3, $4) ON CONFLICT (account_id, remote_name) DO UPDATE SET uidvalidity = EXCLUDED.uidvalidity, message_count = EXCLUDED.message_count, updated_at = NOW() RETURNING id `, accountID, folderName, selectData.UIDValidity, selectData.NumMessages).Scan(&folderID) if err != nil { return fmt.Errorf("upsert folder: %w", err) } if selectData.NumMessages == 0 { return nil } // Get highest UID we already have for this folder var lastUID uint32 _ = w.db.QueryRow(ctx, ` SELECT COALESCE(MAX(uid), 0) FROM messages WHERE folder_id = $1 `, folderID).Scan(&lastUID) // Fetch messages newer than our last UID seqSet := imap.UIDSet{} seqSet.AddRange(imap.UID(lastUID+1), imap.UID(0)) // lastUID+1 to * fetchOpts := &imap.FetchOptions{ UID: true, Flags: true, Envelope: true, BodySection: []*imap.FetchItemBodySection{{}}, } fetchCmd := client.Fetch(seqSet, fetchOpts) for { msg := fetchCmd.Next() if msg == nil { break } if err := w.processMessage(ctx, msg, accountID, folderID); err != nil { w.logger.Error("process message failed", "folder", folderName, "error", err) } } return fetchCmd.Close() } func (w *SyncWorker) processMessage(ctx context.Context, msg *imapclient.FetchMessageData, accountID, folderID string) error { var envelope *imap.Envelope var uid imap.UID var flags []imap.Flag var bodyContent []byte for { item := msg.Next() if item == nil { break } switch data := item.(type) { case imapclient.FetchItemDataUID: uid = data.UID case imapclient.FetchItemDataFlags: flags = data.Flags case imapclient.FetchItemDataEnvelope: envelope = data.Envelope case imapclient.FetchItemDataBodySection: if data.Literal == nil { break } buf := make([]byte, 0, 4096) b := make([]byte, 4096) for { n, readErr := data.Literal.Read(b) buf = append(buf, b[:n]...) if readErr != nil { break } } bodyContent = buf } } if envelope == nil { return nil } flagStrs := make([]string, len(flags)) for i, f := range flags { flagStrs[i] = string(f) } fromAddr := addressesToJSON(envelope.From) toAddrs := addressesToJSON(envelope.To) ccAddrs := addressesToJSON(envelope.Cc) bodyText, bodyHTML := parseBody(bodyContent) snippet := truncate(bodyText, 200) _, err := w.db.Exec(ctx, ` INSERT INTO messages (account_id, folder_id, uid, message_id, subject, from_addr, to_addrs, cc_addrs, date, snippet, body_text, body_html, flags, in_reply_to) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14) ON CONFLICT (folder_id, uid) DO NOTHING `, accountID, folderID, uid, envelope.MessageID, envelope.Subject, fromAddr, toAddrs, ccAddrs, envelope.Date, snippet, bodyText, bodyHTML, flagStrs, strings.Join(envelope.InReplyTo, " ")) return err } func (w *SyncWorker) parseCredentials(creds []byte) (string, string, error) { if len(creds) == 0 { return "", "", errors.New("missing credentials") } if !credentials.IsEncrypted(creds) { return "", "", errors.New("plaintext credentials forbidden") } if w.credentials == nil { return "", "", errors.New("credential manager not configured") } return w.credentials.Decrypt(creds) } func splitBytes(data []byte, sep byte) [][]byte { var parts [][]byte start := 0 for i, b := range data { if b == sep { parts = append(parts, data[start:i]) start = i + 1 } } parts = append(parts, data[start:]) return parts } func truncate(s string, maxLen int) string { if len(s) <= maxLen { return s } return s[:maxLen] }