package imap import ( "bytes" "context" "errors" "fmt" "log/slog" "strings" "time" "github.com/emersion/go-imap/v2" "github.com/emersion/go-imap/v2/imapclient" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" "github.com/ultisuite/ulti-backend/internal/mail/credentials" "github.com/ultisuite/ulti-backend/internal/mail/connect" mailoauth "github.com/ultisuite/ulti-backend/internal/mail/oauth" "github.com/ultisuite/ulti-backend/internal/mail/limits" "github.com/ultisuite/ulti-backend/internal/mail/rules" "github.com/ultisuite/ulti-backend/internal/mail/storage" "github.com/ultisuite/ulti-backend/internal/mail/threading" "github.com/ultisuite/ulti-backend/internal/observability" "github.com/ultisuite/ulti-backend/internal/realtime" ) // SyncDeps optional services wired into the IMAP sync worker. type SyncDeps struct { Storage *storage.Client AttachBucket string Rules *rules.Engine Hub *realtime.Hub } type SyncWorker struct { db *pgxpool.Pool logger *slog.Logger interval time.Duration credentials *credentials.Manager oauth *mailoauth.Service storage *storage.Client attachBucket string pipeline *syncPipeline } func NewSyncWorker(db *pgxpool.Pool, interval time.Duration, credManager *credentials.Manager, oauthSvc *mailoauth.Service, deps SyncDeps) *SyncWorker { return &SyncWorker{ db: db, logger: slog.Default().With("component", "imap-sync"), interval: interval, credentials: credManager, oauth: oauthSvc, storage: deps.Storage, attachBucket: deps.AttachBucket, pipeline: newSyncPipeline(db, deps.Rules, deps.Hub), } } func (w *SyncWorker) Start(ctx context.Context) { ticker := time.NewTicker(w.interval) defer ticker.Stop() w.logger.Info("imap sync worker started", "interval", w.interval) w.runSyncCycle(ctx) for { select { case <-ctx.Done(): w.logger.Info("imap sync worker stopped") return case <-ticker.C: w.runSyncCycle(ctx) } } } func (w *SyncWorker) runSyncCycle(ctx context.Context) { start := time.Now() if err := w.syncAllAccounts(ctx); err != nil { observability.ObserveIMAPSync("error", time.Since(start)) w.logger.Error("sync cycle failed", "error", err) return } observability.ObserveIMAPSync("success", time.Since(start)) } func (w *SyncWorker) syncAllAccounts(ctx context.Context) error { rows, err := w.db.Query(ctx, ` SELECT id, imap_host, imap_port, imap_tls, credentials FROM mail_accounts WHERE is_active = true `) if err != nil { return err } defer rows.Close() hasSyncError := false for rows.Next() { var ( accountID string host string port int useTLS bool creds []byte ) if err := rows.Scan(&accountID, &host, &port, &useTLS, &creds); err != nil { w.logger.Error("failed to scan account", "error", err) hasSyncError = true continue } if err := w.syncAccount(ctx, accountID, host, port, useTLS, creds); err != nil { w.logger.Error("sync failed", "account_id", accountID, "error", err) hasSyncError = true } } if err := rows.Err(); err != nil { return err } if hasSyncError { return errors.New("one or more account sync failed") } return nil } func (w *SyncWorker) syncAccount(ctx context.Context, accountID, host string, port int, useTLS bool, creds []byte) error { userID, err := w.accountUserID(ctx, accountID) if err != nil { return err } addr := fmt.Sprintf("%s:%d", host, port) opts := &imapclient.Options{} var client *imapclient.Client if useTLS { client, err = imapclient.DialTLS(addr, opts) } else { client, err = imapclient.DialStartTLS(addr, opts) } if err != nil { return fmt.Errorf("dial: %w", err) } defer client.Close() cred, err := w.resolveCredential(ctx, accountID, creds) if err != nil { return fmt.Errorf("decrypt credentials: %w", err) } if err := connect.AuthenticateIMAP(client, cred); err != nil { return fmt.Errorf("login: %w", err) } listOpts := &imap.ListOptions{ReturnSpecialUse: true} mailboxes, err := client.List("", "*", listOpts).Collect() if err != nil { return fmt.Errorf("list: %w", err) } for _, mbox := range mailboxes { folderType := DetectFolderType(mbox.Mailbox, mbox.Attrs) if folderType == "" { continue } if err := w.syncFolder(ctx, client, accountID, userID, mbox.Mailbox, folderType); err != nil { w.logger.Error("folder sync failed", "account_id", accountID, "folder", mbox.Mailbox, "error", err) } } _, err = w.db.Exec(ctx, `UPDATE mail_accounts SET last_sync_at = NOW() WHERE id = $1`, accountID) return err } func (w *SyncWorker) accountUserID(ctx context.Context, accountID string) (string, error) { var userID string err := w.db.QueryRow(ctx, `SELECT user_id FROM mail_accounts WHERE id = $1`, accountID).Scan(&userID) return userID, err } func (w *SyncWorker) syncFolder(ctx context.Context, client *imapclient.Client, accountID, userID, folderName, folderType string) error { selectData, err := client.Select(folderName, &imap.SelectOptions{CondStore: true}).Wait() if err != nil { return fmt.Errorf("select %s: %w", folderName, err) } displayName := DisplayName(folderName, folderType) prevState, hasPrev, err := loadFolderSyncState(ctx, w.db, accountID, folderName) if err != nil && !errors.Is(err, pgx.ErrNoRows) { return fmt.Errorf("load folder state: %w", err) } var folderID string if hasPrev && prevState.UIDValidity != 0 && prevState.UIDValidity != selectData.UIDValidity { if err := resetFolderMessages(ctx, w.db, prevState.FolderID); err != nil { return fmt.Errorf("reset folder after uidvalidity change: %w", err) } prevState.LastUID = 0 prevState.HighestModSeq = 0 } err = w.db.QueryRow(ctx, ` INSERT INTO mail_folders (account_id, name, remote_name, folder_type, uidvalidity, highest_modseq, message_count, last_uid) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT (account_id, remote_name) DO UPDATE SET name = EXCLUDED.name, folder_type = EXCLUDED.folder_type, uidvalidity = EXCLUDED.uidvalidity, highest_modseq = GREATEST(mail_folders.highest_modseq, EXCLUDED.highest_modseq), message_count = EXCLUDED.message_count, updated_at = NOW() RETURNING id, last_uid, highest_modseq `, accountID, displayName, folderName, folderType, selectData.UIDValidity, selectData.HighestModSeq, selectData.NumMessages, prevState.LastUID, ).Scan(&folderID, &prevState.LastUID, &prevState.HighestModSeq) if err != nil { return fmt.Errorf("upsert folder: %w", err) } prevState.FolderID = folderID prevState.UIDValidity = selectData.UIDValidity if selectData.NumMessages == 0 { return saveFolderSyncState(ctx, w.db, folderID, selectData.UIDValidity, selectData.HighestModSeq, 0, 0) } lastUID := prevState.LastUID if lastUID > 0 { if err := w.fetchAndProcess(ctx, client, accountID, userID, folderID, lastUID+1, 0, false); err != nil { return err } } else { if err := w.fetchAndProcess(ctx, client, accountID, userID, folderID, 1, 0, false); err != nil { return err } } if selectData.HighestModSeq > 0 && prevState.HighestModSeq > 0 && selectData.HighestModSeq > prevState.HighestModSeq { if err := w.fetchAndProcess(ctx, client, accountID, userID, folderID, 1, prevState.HighestModSeq, true); err != nil { w.logger.Warn("condstore incremental fetch failed", "folder", folderName, "error", err) } } if err := w.reconcileDeletions(ctx, client, folderID, userID, accountID); err != nil { w.logger.Warn("deletion reconcile failed", "folder", folderName, "error", err) } var maxUID uint32 _ = w.db.QueryRow(ctx, `SELECT COALESCE(MAX(uid), 0) FROM messages WHERE folder_id = $1`, folderID).Scan(&maxUID) return saveFolderSyncState(ctx, w.db, folderID, selectData.UIDValidity, selectData.HighestModSeq, maxUID, int(selectData.NumMessages)) } func (w *SyncWorker) fetchAndProcess(ctx context.Context, client *imapclient.Client, accountID, userID, folderID string, fromUID uint32, changedSince uint64, updatesOnly bool) error { seqSet := imap.UIDSet{} seqSet.AddRange(imap.UID(fromUID), imap.UID(0)) fetchOpts := &imap.FetchOptions{ UID: true, Flags: true, Envelope: true, ModSeq: changedSince > 0, BodySection: []*imap.FetchItemBodySection{{}}, } if changedSince > 0 { fetchOpts.ChangedSince = changedSince } fetchCmd := client.Fetch(seqSet, fetchOpts) for { msg := fetchCmd.Next() if msg == nil { break } kind, messageID, err := w.processMessage(ctx, msg, accountID, userID, folderID, updatesOnly) if err != nil { w.logger.Error("process message failed", "folder_id", folderID, "error", err) continue } if kind != "" && w.pipeline != nil { w.pipeline.handle(ctx, postSyncEvent{ userID: userID, accountID: accountID, messageID: messageID, kind: kind, }) } } return fetchCmd.Close() } func (w *SyncWorker) reconcileDeletions(ctx context.Context, client *imapclient.Client, folderID, userID, accountID string) error { searchData, err := client.UIDSearch(&imap.SearchCriteria{}, &imap.SearchOptions{ReturnAll: true}).Wait() if err != nil { return err } remoteUIDs := uidSetToMap(searchData.All) if len(remoteUIDs) == 0 && searchData.Count == 0 { return nil } rows, err := w.db.Query(ctx, `SELECT id, uid FROM messages WHERE folder_id = $1`, folderID) if err != nil { return err } defer rows.Close() for rows.Next() { var messageID string var uid uint32 if err := rows.Scan(&messageID, &uid); err != nil { return err } if !remoteUIDs[uid] { if _, err := w.db.Exec(ctx, `DELETE FROM messages WHERE id = $1`, messageID); err != nil { return err } if w.pipeline != nil { w.pipeline.handle(ctx, postSyncEvent{ userID: userID, accountID: accountID, messageID: messageID, kind: "deleted", }) } } } return rows.Err() } func uidSetToMap(set imap.NumSet) map[uint32]bool { out := make(map[uint32]bool) if set == nil { return out } uidSet, ok := set.(imap.UIDSet) if !ok { return out } uids, ok := uidSet.Nums() if !ok { return out } for _, uid := range uids { out[uint32(uid)] = true } return out } func (w *SyncWorker) processMessage(ctx context.Context, msg *imapclient.FetchMessageData, accountID, userID, folderID string, updatesOnly bool) (kind, messageID string, err error) { var envelope *imap.Envelope var uid imap.UID var flags []imap.Flag var bodyContent []byte for { item := msg.Next() if item == nil { break } switch data := item.(type) { case imapclient.FetchItemDataUID: uid = data.UID case imapclient.FetchItemDataFlags: flags = data.Flags case imapclient.FetchItemDataEnvelope: envelope = data.Envelope case imapclient.FetchItemDataBodySection: if data.Literal == nil { break } buf := make([]byte, 0, 4096) b := make([]byte, 4096) for { n, readErr := data.Literal.Read(b) buf = append(buf, b[:n]...) if readErr != nil { break } } bodyContent = buf } } if envelope == nil || uid == 0 { return "", "", nil } flagStrs := flagsToStrings(flags) fromAddr := addressesToJSON(envelope.From) toAddrs := addressesToJSON(envelope.To) ccAddrs := addressesToJSON(envelope.Cc) bodyText, bodyHTML := parseBody(bodyContent) snippet := truncate(bodyText, 200) headerRefs, headerInReplyTo := parseThreadHeaders(bodyContent) inReplyTo := headerInReplyTo if inReplyTo == "" && len(envelope.InReplyTo) > 0 { inReplyTo = threading.NormalizeMessageID(envelope.InReplyTo[0]) } references := headerRefs if len(references) == 0 { references = threading.ParseMessageIDs(strings.Join(envelope.InReplyTo, " ")) } var existed bool _ = w.db.QueryRow(ctx, ` SELECT EXISTS(SELECT 1 FROM messages WHERE folder_id = $1 AND uid = $2) `, folderID, uid).Scan(&existed) err = w.db.QueryRow(ctx, ` INSERT INTO messages (account_id, folder_id, uid, message_id, subject, from_addr, to_addrs, cc_addrs, date, snippet, body_text, body_html, flags, in_reply_to, references_header) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15) ON CONFLICT (folder_id, uid) DO UPDATE SET message_id = EXCLUDED.message_id, subject = EXCLUDED.subject, from_addr = EXCLUDED.from_addr, to_addrs = EXCLUDED.to_addrs, cc_addrs = EXCLUDED.cc_addrs, date = EXCLUDED.date, snippet = EXCLUDED.snippet, body_text = EXCLUDED.body_text, body_html = EXCLUDED.body_html, flags = EXCLUDED.flags, in_reply_to = EXCLUDED.in_reply_to, references_header = EXCLUDED.references_header, updated_at = NOW() RETURNING id `, accountID, folderID, uid, envelope.MessageID, envelope.Subject, fromAddr, toAddrs, ccAddrs, envelope.Date, snippet, bodyText, bodyHTML, flagStrs, inReplyTo, references, ).Scan(&messageID) if err != nil { return "", "", err } threadID, err := threading.AssignThreadID(ctx, w.db, accountID, inReplyTo, references) if err != nil { return "", "", err } _, err = w.db.Exec(ctx, `UPDATE messages SET thread_id = $1, updated_at = NOW() WHERE id = $2`, threadID, messageID) if err != nil { return "", "", err } if err := w.storeAttachments(ctx, userID, messageID, bodyContent, existed); err != nil { w.logger.Warn("attachment store failed", "message_id", messageID, "error", err) } if existed { return "updated", messageID, nil } if updatesOnly { return "updated", messageID, nil } return "created", messageID, nil } func (w *SyncWorker) storeAttachments(ctx context.Context, userID, messageID string, raw []byte, messageExisted bool) error { if w.storage == nil || len(raw) == 0 { return nil } if messageExisted { var attCount int _ = w.db.QueryRow(ctx, `SELECT COUNT(*) FROM attachments WHERE message_id = $1`, messageID).Scan(&attCount) if attCount > 0 { return nil } } parts, err := ExtractAttachments(raw) if err != nil || len(parts) == 0 { return err } bucket := w.attachBucket if bucket == "" { bucket = "mail-attachments" } for _, part := range parts { if err := limits.ValidateAttachmentSize(int64(len(part.Data))); err != nil { continue } objectKey := storage.MessageObjectKey(userID, messageID, part.Filename) if err := w.storage.Put(ctx, objectKey, bytes.NewReader(part.Data), int64(len(part.Data)), part.ContentType); err != nil { return err } _, err := w.db.Exec(ctx, ` INSERT INTO attachments (message_id, filename, content_type, size, s3_bucket, s3_key, content_id, is_inline) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) `, messageID, part.Filename, part.ContentType, len(part.Data), bucket, objectKey, part.ContentID, part.IsInline) if err != nil { _ = w.storage.Delete(ctx, objectKey) return err } } _, err = w.db.Exec(ctx, `UPDATE messages SET has_attachments = true, updated_at = NOW() WHERE id = $1`, messageID) return err } func flagsToStrings(flags []imap.Flag) []string { out := make([]string, len(flags)) for i, f := range flags { out[i] = string(f) } return out } func (w *SyncWorker) resolveCredential(ctx context.Context, accountID string, creds []byte) (credentials.Credential, error) { if len(creds) == 0 { return credentials.Credential{}, errors.New("missing credentials") } if !credentials.IsEncrypted(creds) { return credentials.Credential{}, errors.New("plaintext credentials forbidden") } if w.credentials == nil { return credentials.Credential{}, errors.New("credential manager not configured") } cred, err := w.credentials.DecryptCredential(creds) if err != nil { return credentials.Credential{}, err } if w.oauth != nil && cred.IsOAuth() { return mailoauth.RefreshAccountCredential(ctx, w.db, w.credentials, w.oauth, accountID, cred) } return cred, nil } func truncate(s string, maxLen int) string { if len(s) <= maxLen { return s } return s[:maxLen] }