package imap import ( "bytes" "context" "encoding/json" "errors" "fmt" "log/slog" "strings" "time" "github.com/emersion/go-imap/v2" "github.com/emersion/go-imap/v2/imapclient" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" "github.com/ultisuite/ulti-backend/internal/mail/credentials" "github.com/ultisuite/ulti-backend/internal/mail/connect" mailoauth "github.com/ultisuite/ulti-backend/internal/mail/oauth" "github.com/ultisuite/ulti-backend/internal/mail/limits" "github.com/ultisuite/ulti-backend/internal/mail/rules" "github.com/ultisuite/ulti-backend/internal/mail/sanitize" "github.com/ultisuite/ulti-backend/internal/mail/storage" "github.com/ultisuite/ulti-backend/internal/mail/threading" "github.com/ultisuite/ulti-backend/internal/observability" "github.com/ultisuite/ulti-backend/internal/realtime" ) // MailAutomation runs rules and outbound webhooks after a new message is synced. type MailAutomation interface { OnMailCreated(ctx context.Context, userID, accountID, messageID string, msg *rules.Message) } // SyncDeps optional services wired into the IMAP sync worker. type SyncDeps struct { Storage *storage.Client AttachBucket string Rules *rules.Engine Automation MailAutomation Hub *realtime.Hub } type SyncWorker struct { db *pgxpool.Pool logger *slog.Logger interval time.Duration credentials *credentials.Manager oauth *mailoauth.Service storage *storage.Client attachBucket string pipeline *syncPipeline } func NewSyncWorker(db *pgxpool.Pool, interval time.Duration, credManager *credentials.Manager, oauthSvc *mailoauth.Service, deps SyncDeps) *SyncWorker { return &SyncWorker{ db: db, logger: slog.Default().With("component", "imap-sync"), interval: interval, credentials: credManager, oauth: oauthSvc, storage: deps.Storage, attachBucket: deps.AttachBucket, pipeline: newSyncPipeline(db, deps.Rules, deps.Automation, deps.Hub), } } func (w *SyncWorker) Start(ctx context.Context) { ticker := time.NewTicker(w.interval) defer ticker.Stop() w.logger.Info("imap sync worker started", "interval", w.interval) w.runSyncCycle(ctx) for { select { case <-ctx.Done(): w.logger.Info("imap sync worker stopped") return case <-ticker.C: w.runSyncCycle(ctx) } } } func (w *SyncWorker) runSyncCycle(ctx context.Context) { start := time.Now() if err := w.syncAllAccounts(ctx); err != nil { observability.ObserveIMAPSync("error", time.Since(start)) w.logger.Error("sync cycle failed", "error", err) return } observability.ObserveIMAPSync("success", time.Since(start)) } func (w *SyncWorker) syncAllAccounts(ctx context.Context) error { rows, err := w.db.Query(ctx, ` SELECT id, imap_host, imap_port, imap_tls, credentials FROM mail_accounts WHERE is_active = true `) if err != nil { return err } defer rows.Close() hasSyncError := false for rows.Next() { var ( accountID string host string port int useTLS bool creds []byte ) if err := rows.Scan(&accountID, &host, &port, &useTLS, &creds); err != nil { w.logger.Error("failed to scan account", "error", err) hasSyncError = true continue } if err := w.syncAccount(ctx, accountID, host, port, useTLS, creds); err != nil { w.logger.Error("sync failed", "account_id", accountID, "error", err) hasSyncError = true } } if err := rows.Err(); err != nil { return err } if hasSyncError { return errors.New("one or more account sync failed") } return nil } // SyncAccountForUser triggers an immediate IMAP sync for a single owned account. func (w *SyncWorker) SyncAccountForUser(ctx context.Context, externalID, accountID string) error { return w.syncAccountForUser(ctx, externalID, accountID, false) } // ForceSyncAccountForUser resets sync cursors then re-fetches every message // from IMAP, re-applying sanitization. Existing rows are updated in place. func (w *SyncWorker) ForceSyncAccountForUser(ctx context.Context, externalID, accountID string) error { return w.syncAccountForUser(ctx, externalID, accountID, true) } func (w *SyncWorker) syncAccountForUser(ctx context.Context, externalID, accountID string, force bool) error { var ( host string port int useTLS bool creds []byte ) err := w.db.QueryRow(ctx, ` SELECT ma.imap_host, ma.imap_port, ma.imap_tls, ma.credentials FROM mail_accounts ma JOIN users u ON ma.user_id = u.id WHERE ma.id = $1 AND u.external_id = $2 AND ma.is_active = true `, accountID, externalID).Scan(&host, &port, &useTLS, &creds) if err != nil { if errors.Is(err, pgx.ErrNoRows) { return fmt.Errorf("account not found") } return err } if force { if err := resetAccountSyncCursors(ctx, w.db, accountID); err != nil { return fmt.Errorf("reset sync cursors: %w", err) } w.logger.Info("force sync: reset cursors", "account_id", accountID) } return w.syncAccount(ctx, accountID, host, port, useTLS, creds) } func (w *SyncWorker) syncAccount(ctx context.Context, accountID, host string, port int, useTLS bool, creds []byte) error { userID, err := w.accountUserID(ctx, accountID) if err != nil { return err } addr := fmt.Sprintf("%s:%d", host, port) opts := &imapclient.Options{} var client *imapclient.Client if useTLS { client, err = imapclient.DialTLS(addr, opts) } else { client, err = imapclient.DialStartTLS(addr, opts) } if err != nil { return fmt.Errorf("dial: %w", err) } defer client.Close() cred, err := w.resolveCredential(ctx, accountID, creds) if err != nil { return fmt.Errorf("decrypt credentials: %w", err) } if err := connect.AuthenticateIMAP(client, cred); err != nil { return fmt.Errorf("login: %w", err) } listOpts := &imap.ListOptions{ReturnSpecialUse: true} mailboxes, err := client.List("", "*", listOpts).Collect() if err != nil { return fmt.Errorf("list: %w", err) } for _, mbox := range mailboxes { folderType := DetectFolderType(mbox.Mailbox, mbox.Attrs) if folderType == "" { continue } if err := w.syncFolder(ctx, client, accountID, userID, mbox.Mailbox, folderType); err != nil { w.logger.Error("folder sync failed", "account_id", accountID, "folder", mbox.Mailbox, "error", err) } } w.tagImportantFolderMessages(ctx, accountID) _, err = w.db.Exec(ctx, `UPDATE mail_accounts SET last_sync_at = NOW() WHERE id = $1`, accountID) return err } func (w *SyncWorker) accountUserID(ctx context.Context, accountID string) (string, error) { var userID string err := w.db.QueryRow(ctx, `SELECT user_id FROM mail_accounts WHERE id = $1`, accountID).Scan(&userID) return userID, err } func (w *SyncWorker) syncFolder(ctx context.Context, client *imapclient.Client, accountID, userID, folderName, folderType string) error { selectData, err := client.Select(folderName, &imap.SelectOptions{CondStore: true}).Wait() if err != nil { return fmt.Errorf("select %s: %w", folderName, err) } displayName := DisplayName(folderName, folderType) prevState, hasPrev, err := loadFolderSyncState(ctx, w.db, accountID, folderName) if err != nil && !errors.Is(err, pgx.ErrNoRows) { return fmt.Errorf("load folder state: %w", err) } var folderID string if hasPrev && prevState.UIDValidity != 0 && prevState.UIDValidity != selectData.UIDValidity { if err := resetFolderMessages(ctx, w.db, prevState.FolderID); err != nil { return fmt.Errorf("reset folder after uidvalidity change: %w", err) } prevState.LastUID = 0 prevState.HighestModSeq = 0 } err = w.db.QueryRow(ctx, ` INSERT INTO mail_folders (account_id, name, remote_name, folder_type, uidvalidity, highest_modseq, message_count, last_uid) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT (account_id, remote_name) DO UPDATE SET name = EXCLUDED.name, folder_type = EXCLUDED.folder_type, uidvalidity = EXCLUDED.uidvalidity, highest_modseq = GREATEST(mail_folders.highest_modseq, EXCLUDED.highest_modseq), message_count = EXCLUDED.message_count, updated_at = NOW() RETURNING id, last_uid, highest_modseq `, accountID, displayName, folderName, folderType, selectData.UIDValidity, selectData.HighestModSeq, selectData.NumMessages, prevState.LastUID, ).Scan(&folderID, &prevState.LastUID, &prevState.HighestModSeq) if err != nil { return fmt.Errorf("upsert folder: %w", err) } prevState.FolderID = folderID prevState.UIDValidity = selectData.UIDValidity if selectData.NumMessages == 0 { return saveFolderSyncState(ctx, w.db, folderID, selectData.UIDValidity, selectData.HighestModSeq, 0, 0) } lastUID := prevState.LastUID derivedLabels := FolderDerivedLabels(folderName) if lastUID > 0 { if err := w.fetchAndProcess(ctx, client, accountID, userID, folderID, lastUID+1, 0, false, derivedLabels); err != nil { return err } } else { if err := w.fetchAndProcess(ctx, client, accountID, userID, folderID, 1, 0, false, derivedLabels); err != nil { return err } } if selectData.HighestModSeq > 0 && prevState.HighestModSeq > 0 && selectData.HighestModSeq > prevState.HighestModSeq { if err := w.fetchAndProcess(ctx, client, accountID, userID, folderID, 1, prevState.HighestModSeq, true, derivedLabels); err != nil { w.logger.Warn("condstore incremental fetch failed", "folder", folderName, "error", err) } } if err := w.reconcileDeletions(ctx, client, folderID, userID, accountID); err != nil { w.logger.Warn("deletion reconcile failed", "folder", folderName, "error", err) } var maxUID uint32 _ = w.db.QueryRow(ctx, `SELECT COALESCE(MAX(uid), 0) FROM messages WHERE folder_id = $1`, folderID).Scan(&maxUID) return saveFolderSyncState(ctx, w.db, folderID, selectData.UIDValidity, selectData.HighestModSeq, maxUID, int(selectData.NumMessages)) } func (w *SyncWorker) fetchAndProcess(ctx context.Context, client *imapclient.Client, accountID, userID, folderID string, fromUID uint32, changedSince uint64, updatesOnly bool, derivedLabels []string) error { seqSet := imap.UIDSet{} seqSet.AddRange(imap.UID(fromUID), imap.UID(0)) fetchOpts := &imap.FetchOptions{ UID: true, Flags: true, Envelope: true, ModSeq: changedSince > 0, BodySection: []*imap.FetchItemBodySection{{}}, } if changedSince > 0 { fetchOpts.ChangedSince = changedSince } fetchCmd := client.Fetch(seqSet, fetchOpts) for { msg := fetchCmd.Next() if msg == nil { break } kind, messageID, err := w.processMessage(ctx, msg, accountID, userID, folderID, updatesOnly, derivedLabels) if err != nil { w.logger.Error("process message failed", "folder_id", folderID, "error", err) continue } if kind != "" && w.pipeline != nil { w.pipeline.handle(ctx, postSyncEvent{ userID: userID, accountID: accountID, messageID: messageID, kind: kind, }) } } return fetchCmd.Close() } func (w *SyncWorker) reconcileDeletions(ctx context.Context, client *imapclient.Client, folderID, userID, accountID string) error { searchData, err := client.UIDSearch(&imap.SearchCriteria{}, &imap.SearchOptions{ReturnAll: true}).Wait() if err != nil { return err } remoteUIDs := uidSetToMap(searchData.All) if len(remoteUIDs) == 0 && searchData.Count == 0 { return nil } rows, err := w.db.Query(ctx, `SELECT id, uid FROM messages WHERE folder_id = $1`, folderID) if err != nil { return err } defer rows.Close() for rows.Next() { var messageID string var uid uint32 if err := rows.Scan(&messageID, &uid); err != nil { return err } if !remoteUIDs[uid] { if _, err := w.db.Exec(ctx, `DELETE FROM messages WHERE id = $1`, messageID); err != nil { return err } if w.pipeline != nil { w.pipeline.handle(ctx, postSyncEvent{ userID: userID, accountID: accountID, messageID: messageID, kind: "deleted", }) } } } return rows.Err() } func uidSetToMap(set imap.NumSet) map[uint32]bool { out := make(map[uint32]bool) if set == nil { return out } uidSet, ok := set.(imap.UIDSet) if !ok { return out } uids, ok := uidSet.Nums() if !ok { return out } for _, uid := range uids { out[uint32(uid)] = true } return out } func (w *SyncWorker) processMessage(ctx context.Context, msg *imapclient.FetchMessageData, accountID, userID, folderID string, updatesOnly bool, derivedLabels []string) (kind, messageID string, err error) { var envelope *imap.Envelope var uid imap.UID var flags []imap.Flag var bodyContent []byte for { item := msg.Next() if item == nil { break } switch data := item.(type) { case imapclient.FetchItemDataUID: uid = data.UID case imapclient.FetchItemDataFlags: flags = data.Flags case imapclient.FetchItemDataEnvelope: envelope = data.Envelope case imapclient.FetchItemDataBodySection: if data.Literal == nil { break } buf := make([]byte, 0, 4096) b := make([]byte, 4096) for { n, readErr := data.Literal.Read(b) buf = append(buf, b[:n]...) if readErr != nil { break } } bodyContent = buf } } if envelope == nil || uid == 0 { return "", "", nil } flagStrs := flagsToStrings(flags) fromList := envelope.From if len(fromList) == 0 { fromList = envelope.Sender } fromAddr := addressesToJSON(fromList) if len(fromList) == 0 { if hdrFrom := parseFromHeader(bodyContent); len(hdrFrom) > 0 { b, _ := json.Marshal(hdrFrom) fromAddr = b } } if isEmptyFromJSON(fromAddr) { var folderType string _ = w.db.QueryRow(ctx, `SELECT folder_type FROM mail_folders WHERE id = $1`, folderID).Scan(&folderType) if folderType == "sent" { if acctFrom, err := w.accountFromJSON(ctx, accountID); err == nil && len(acctFrom) > 0 { fromAddr = acctFrom } } } toAddrs := addressesToJSON(envelope.To) ccAddrs := addressesToJSON(envelope.Cc) bodyText, bodyHTML := parseBody(bodyContent) snippet := SnippetFromBodies(bodyText, bodyHTML, 200) headerRefs, headerInReplyTo := parseThreadHeaders(bodyContent) inReplyTo := headerInReplyTo if inReplyTo == "" && len(envelope.InReplyTo) > 0 { inReplyTo = threading.NormalizeMessageID(envelope.InReplyTo[0]) } references := headerRefs if references == nil { references = []string{} } rfcMessageID := threading.NormalizeMessageID(envelope.MessageID) replyToJSON, authJSON := parseMessageMeta(bodyContent, envelope) subject := RepairSubject(envelope.Subject, bodyText, bodyHTML, bodyContent) snippet = toValidUTF8(snippet) bodyText = toValidUTF8(bodyText) bodyHTML = toValidUTF8(sanitize.SanitizeHTML(bodyHTML)) var existed bool _ = w.db.QueryRow(ctx, ` SELECT EXISTS(SELECT 1 FROM messages WHERE folder_id = $1 AND uid = $2) `, folderID, uid).Scan(&existed) err = w.db.QueryRow(ctx, ` INSERT INTO messages (account_id, folder_id, uid, message_id, subject, from_addr, to_addrs, cc_addrs, reply_to, auth_info, date, snippet, body_text, body_html, flags, in_reply_to, references_header) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17) ON CONFLICT (folder_id, uid) DO UPDATE SET message_id = EXCLUDED.message_id, subject = EXCLUDED.subject, from_addr = EXCLUDED.from_addr, to_addrs = EXCLUDED.to_addrs, cc_addrs = EXCLUDED.cc_addrs, reply_to = EXCLUDED.reply_to, auth_info = EXCLUDED.auth_info, date = EXCLUDED.date, snippet = EXCLUDED.snippet, body_text = EXCLUDED.body_text, body_html = EXCLUDED.body_html, flags = EXCLUDED.flags, in_reply_to = EXCLUDED.in_reply_to, references_header = EXCLUDED.references_header, updated_at = NOW() RETURNING id `, accountID, folderID, uid, rfcMessageID, subject, fromAddr, toAddrs, ccAddrs, replyToJSON, authJSON, envelope.Date, snippet, bodyText, bodyHTML, flagStrs, inReplyTo, references, ).Scan(&messageID) if err != nil { return "", "", err } if err := threading.ApplyMessageThread(ctx, w.db, accountID, messageID, rfcMessageID, inReplyTo, references); err != nil { return "", "", err } if len(derivedLabels) > 0 { if _, err := w.db.Exec(ctx, ` UPDATE messages SET labels = ( SELECT COALESCE(array_agg(DISTINCT elem), '{}') FROM unnest(COALESCE(labels, '{}') || $1::text[]) AS elem ), updated_at = NOW() WHERE id = $2 `, derivedLabels, messageID); err != nil { return "", "", err } } if err := w.storeAttachments(ctx, userID, messageID, bodyContent, existed); err != nil { w.logger.Warn("attachment store failed", "message_id", messageID, "error", err) } if existed { return "updated", messageID, nil } if updatesOnly { return "updated", messageID, nil } return "created", messageID, nil } func (w *SyncWorker) tagImportantFolderMessages(ctx context.Context, accountID string) { _, err := w.db.Exec(ctx, ` UPDATE messages m SET labels = ( SELECT COALESCE(array_agg(DISTINCT elem), '{}') FROM unnest(COALESCE(m.labels, '{}') || ARRAY['important']) AS elem ), updated_at = NOW() FROM mail_folders mf WHERE m.folder_id = mf.id AND m.account_id = $1 AND LOWER(mf.name) = 'important' AND NOT (COALESCE(m.labels, '{}') @> ARRAY['important']) `, accountID) if err != nil { w.logger.Warn("tag important folder messages failed", "account_id", accountID, "error", err) } } func (w *SyncWorker) storeAttachments(ctx context.Context, userID, messageID string, raw []byte, messageExisted bool) error { if w.storage == nil || len(raw) == 0 { return nil } parts, err := ExtractAttachments(raw) if err != nil || len(parts) == 0 { return err } bucket := w.attachBucket if bucket == "" { bucket = "mail-attachments" } for _, part := range parts { if err := limits.ValidateAttachmentSize(int64(len(part.Data))); err != nil { continue } if messageExisted && attachmentPartExists(ctx, w.db, messageID, part) { continue } objectKey := storage.MessageObjectKey(userID, messageID, part.Filename) if err := w.storage.Put(ctx, objectKey, bytes.NewReader(part.Data), int64(len(part.Data)), part.ContentType); err != nil { return err } _, err := w.db.Exec(ctx, ` INSERT INTO attachments (message_id, filename, content_type, size, s3_bucket, s3_key, content_id, is_inline) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) `, messageID, part.Filename, part.ContentType, len(part.Data), bucket, objectKey, part.ContentID, part.IsInline) if err != nil { _ = w.storage.Delete(ctx, objectKey) return err } } _, err = w.db.Exec(ctx, `UPDATE messages SET has_attachments = true, updated_at = NOW() WHERE id = $1`, messageID) return err } func attachmentPartExists(ctx context.Context, db *pgxpool.Pool, messageID string, part AttachmentPart) bool { var count int if part.ContentID != "" { _ = db.QueryRow(ctx, ` SELECT COUNT(*) FROM attachments WHERE message_id = $1 AND (content_id = $2 OR filename = $3) `, messageID, part.ContentID, part.Filename).Scan(&count) return count > 0 } _ = db.QueryRow(ctx, ` SELECT COUNT(*) FROM attachments WHERE message_id = $1 AND filename = $2 `, messageID, part.Filename).Scan(&count) return count > 0 } // ReindexMessageAttachments fetches one message from IMAP and stores missing inline parts. func (w *SyncWorker) ReindexMessageAttachments(ctx context.Context, externalID, messageID string) error { if w.storage == nil { return errors.New("object storage unavailable") } var accountID, userID, folderName string var uid uint32 err := w.db.QueryRow(ctx, ` SELECT m.account_id, ma.user_id, mf.remote_name, m.uid FROM messages m JOIN mail_accounts ma ON m.account_id = ma.id JOIN users u ON ma.user_id = u.id JOIN mail_folders mf ON m.folder_id = mf.id WHERE m.id = $1 AND u.external_id = $2 AND ma.is_active = true `, messageID, externalID).Scan(&accountID, &userID, &folderName, &uid) if err != nil { if errors.Is(err, pgx.ErrNoRows) { return fmt.Errorf("message not found") } return err } if uid == 0 { return fmt.Errorf("message has no IMAP uid") } var host string var port int var useTLS bool var creds []byte err = w.db.QueryRow(ctx, `SELECT imap_host, imap_port, imap_tls, credentials FROM mail_accounts WHERE id = $1`, accountID). Scan(&host, &port, &useTLS, &creds) if err != nil { return err } addr := fmt.Sprintf("%s:%d", host, port) var client *imapclient.Client if useTLS { client, err = imapclient.DialTLS(addr, &imapclient.Options{}) } else { client, err = imapclient.DialStartTLS(addr, &imapclient.Options{}) } if err != nil { return fmt.Errorf("dial: %w", err) } defer client.Close() cred, err := w.resolveCredential(ctx, accountID, creds) if err != nil { return err } if err := connect.AuthenticateIMAP(client, cred); err != nil { return fmt.Errorf("login: %w", err) } if _, err := client.Select(folderName, nil).Wait(); err != nil { return fmt.Errorf("select: %w", err) } bodyContent, err := fetchIMAPMessageRawBody(client, uid) if err != nil { return err } return w.storeAttachments(ctx, userID, messageID, bodyContent, true) } type RefetchBodiesResult struct { Scanned int `json:"scanned"` Updated int `json:"updated"` } const refetchBodiesBatchSize = 100 // RefetchAccountBodiesForUser re-downloads raw MIME from IMAP and re-parses bodies // (text/html + inline attachments). Fixes messages imported with outdated sanitization. func (w *SyncWorker) RefetchAccountBodiesForUser(ctx context.Context, externalID, accountID string) (RefetchBodiesResult, error) { var result RefetchBodiesResult if w.storage == nil { return result, errors.New("object storage unavailable") } var host string var port int var useTLS bool var creds []byte err := w.db.QueryRow(ctx, ` SELECT ma.imap_host, ma.imap_port, ma.imap_tls, ma.credentials FROM mail_accounts ma JOIN users u ON ma.user_id = u.id WHERE ma.id = $1 AND u.external_id = $2 AND ma.is_active = true `, accountID, externalID).Scan(&host, &port, &useTLS, &creds) if err != nil { if errors.Is(err, pgx.ErrNoRows) { return result, fmt.Errorf("account not found") } return result, err } type messageRef struct { id string userID string folderName string uid uint32 } var lastID string for { rows, err := w.db.Query(ctx, ` SELECT m.id, ma.user_id, mf.remote_name, m.uid FROM messages m JOIN mail_accounts ma ON m.account_id = ma.id JOIN mail_folders mf ON m.folder_id = mf.id WHERE m.account_id = $1 AND m.uid > 0 AND ($2 = '' OR m.id > $2::uuid) ORDER BY m.id LIMIT $3 `, accountID, lastID, refetchBodiesBatchSize) if err != nil { return result, err } batch := make([]messageRef, 0, refetchBodiesBatchSize) for rows.Next() { var ref messageRef if err := rows.Scan(&ref.id, &ref.userID, &ref.folderName, &ref.uid); err != nil { rows.Close() return result, err } batch = append(batch, ref) lastID = ref.id } if err := rows.Err(); err != nil { rows.Close() return result, err } rows.Close() if len(batch) == 0 { break } byFolder := make(map[string][]messageRef) for _, ref := range batch { byFolder[ref.folderName] = append(byFolder[ref.folderName], ref) } addr := fmt.Sprintf("%s:%d", host, port) var client *imapclient.Client if useTLS { client, err = imapclient.DialTLS(addr, &imapclient.Options{}) } else { client, err = imapclient.DialStartTLS(addr, &imapclient.Options{}) } if err != nil { return result, fmt.Errorf("dial: %w", err) } cred, err := w.resolveCredential(ctx, accountID, creds) if err != nil { client.Close() return result, err } if err := connect.AuthenticateIMAP(client, cred); err != nil { client.Close() return result, fmt.Errorf("login: %w", err) } for folderName, refs := range byFolder { if _, err := client.Select(folderName, nil).Wait(); err != nil { w.logger.Warn("refetch bodies: select folder failed", "folder", folderName, "error", err) result.Scanned += len(refs) continue } for _, ref := range refs { result.Scanned++ updated, err := w.refetchStoredMessageBody(ctx, client, ref.userID, ref.id, ref.uid) if err != nil { w.logger.Warn("refetch message body failed", "message_id", ref.id, "error", err) continue } if updated { result.Updated++ } } } client.Close() if len(batch) < refetchBodiesBatchSize { break } } return result, nil } func (w *SyncWorker) refetchStoredMessageBody(ctx context.Context, client *imapclient.Client, userID, messageID string, uid uint32) (bool, error) { bodyContent, err := fetchIMAPMessageRawBody(client, uid) if err != nil { return false, err } if len(bodyContent) == 0 { return false, nil } bodyText, bodyHTML := parseBody(bodyContent) bodyText = toValidUTF8(bodyText) bodyHTML = toValidUTF8(sanitize.SanitizeHTML(bodyHTML)) snippet := SnippetFromBodies(bodyText, bodyHTML, 200) tag, err := w.db.Exec(ctx, ` UPDATE messages SET body_text = $2, body_html = $3, snippet = CASE WHEN $4 <> '' THEN $4 ELSE snippet END, updated_at = NOW() WHERE id = $1 AND (body_text IS DISTINCT FROM $2 OR body_html IS DISTINCT FROM $3) `, messageID, bodyText, bodyHTML, snippet) if err != nil { return false, err } if err := w.storeAttachments(ctx, userID, messageID, bodyContent, true); err != nil { return false, err } return tag.RowsAffected() > 0, nil } func fetchIMAPMessageRawBody(client *imapclient.Client, uid uint32) ([]byte, error) { seqSet := imap.UIDSet{} seqSet.AddNum(imap.UID(uid)) fetchCmd := client.Fetch(seqSet, &imap.FetchOptions{ UID: true, BodySection: []*imap.FetchItemBodySection{{}}, }) msg := fetchCmd.Next() if msg == nil { return nil, fmt.Errorf("message not found on server") } var bodyContent []byte for { item := msg.Next() if item == nil { break } if data, ok := item.(imapclient.FetchItemDataBodySection); ok && data.Literal != nil { buf := make([]byte, 0, 4096) b := make([]byte, 4096) for { n, readErr := data.Literal.Read(b) buf = append(buf, b[:n]...) if readErr != nil { break } } bodyContent = buf } } return bodyContent, nil } func isEmptyFromJSON(fromAddr []byte) bool { if len(fromAddr) == 0 || string(fromAddr) == "[]" || string(fromAddr) == "null" { return true } var addrs []EmailAddress if err := json.Unmarshal(fromAddr, &addrs); err != nil { return true } for _, a := range addrs { if strings.TrimSpace(a.Address) != "" || strings.TrimSpace(a.Name) != "" { return false } } return true } func (w *SyncWorker) accountFromJSON(ctx context.Context, accountID string) ([]byte, error) { var email, name string err := w.db.QueryRow(ctx, `SELECT email, name FROM mail_accounts WHERE id = $1`, accountID).Scan(&email, &name) if err != nil { return nil, err } if strings.TrimSpace(email) == "" { return nil, nil } return json.Marshal([]EmailAddress{{Name: name, Address: email}}) } func flagsToStrings(flags []imap.Flag) []string { out := make([]string, len(flags)) for i, f := range flags { out[i] = string(f) } return out } func (w *SyncWorker) resolveCredential(ctx context.Context, accountID string, creds []byte) (credentials.Credential, error) { if len(creds) == 0 { return credentials.Credential{}, errors.New("missing credentials") } if !credentials.IsEncrypted(creds) { return credentials.Credential{}, errors.New("plaintext credentials forbidden") } if w.credentials == nil { return credentials.Credential{}, errors.New("credential manager not configured") } cred, err := w.credentials.DecryptCredential(creds) if err != nil { return credentials.Credential{}, err } if w.oauth != nil && cred.IsOAuth() { return mailoauth.RefreshAccountCredential(ctx, w.db, w.credentials, w.oauth, accountID, cred) } return cred, nil } func truncate(s string, maxLen int) string { if len(s) <= maxLen { return s } return s[:maxLen] }