ultisuite-backend/internal/mail/imap/sync.go
R3D347HR4Y f97988b51f
Some checks failed
CI / Go tests (push) Has been cancelled
CI / Integration tests (push) Has been cancelled
CI / DB migrations (push) Has been cancelled
feat(devices): implement mobile device token management and push notifications
- Added device token management API for mobile devices, including registration, unregistration, and listing of devices.
- Implemented push notification functionality using FCM for Android and APNS for iOS.
- Introduced new endpoints for device registration and management in the devices API.
- Enhanced the configuration to support mobile push notifications with optional credentials for FCM and APNS.
- Updated database schema to include a new table for storing device tokens.
- Added integration tests for device management and push notification features.
2026-06-17 00:11:25 +02:00

958 lines
28 KiB
Go

package imap
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
"strings"
"time"
"github.com/emersion/go-imap/v2"
"github.com/emersion/go-imap/v2/imapclient"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/ultisuite/ulti-backend/internal/mail/credentials"
"github.com/ultisuite/ulti-backend/internal/mail/connect"
"github.com/ultisuite/ulti-backend/internal/filescan"
mailoauth "github.com/ultisuite/ulti-backend/internal/mail/oauth"
"github.com/ultisuite/ulti-backend/internal/mail/limits"
"github.com/ultisuite/ulti-backend/internal/mail/rules"
"github.com/ultisuite/ulti-backend/internal/mail/sanitize"
"github.com/ultisuite/ulti-backend/internal/mail/storage"
"github.com/ultisuite/ulti-backend/internal/mail/threading"
"github.com/ultisuite/ulti-backend/internal/observability"
"github.com/ultisuite/ulti-backend/internal/realtime"
)
// MailAutomation runs rules and outbound webhooks after a new message is synced.
type MailAutomation interface {
OnMailCreated(ctx context.Context, userID, accountID, messageID string, msg *rules.Message)
}
// SyncDeps optional services wired into the IMAP sync worker.
type SyncDeps struct {
Storage *storage.Client
AttachBucket string
Rules *rules.Engine
Automation MailAutomation
Hub *realtime.Hub
FileScanner *filescan.Scanner
Push Pusher
}
type SyncWorker struct {
db *pgxpool.Pool
logger *slog.Logger
interval time.Duration
credentials *credentials.Manager
oauth *mailoauth.Service
storage *storage.Client
attachBucket string
scanner *filescan.Scanner
pipeline *syncPipeline
}
func NewSyncWorker(db *pgxpool.Pool, interval time.Duration, credManager *credentials.Manager, oauthSvc *mailoauth.Service, deps SyncDeps) *SyncWorker {
return &SyncWorker{
db: db,
logger: slog.Default().With("component", "imap-sync"),
interval: interval,
credentials: credManager,
oauth: oauthSvc,
storage: deps.Storage,
attachBucket: deps.AttachBucket,
scanner: deps.FileScanner,
pipeline: newSyncPipeline(db, deps.Rules, deps.Automation, deps.Hub, deps.Push),
}
}
func (w *SyncWorker) Start(ctx context.Context) {
ticker := time.NewTicker(w.interval)
defer ticker.Stop()
w.logger.Info("imap sync worker started", "interval", w.interval)
w.runSyncCycle(ctx)
for {
select {
case <-ctx.Done():
w.logger.Info("imap sync worker stopped")
return
case <-ticker.C:
w.runSyncCycle(ctx)
}
}
}
func (w *SyncWorker) runSyncCycle(ctx context.Context) {
start := time.Now()
if err := w.syncAllAccounts(ctx); err != nil {
observability.ObserveIMAPSync("error", time.Since(start))
w.logger.Error("sync cycle failed", "error", err)
return
}
observability.ObserveIMAPSync("success", time.Since(start))
}
func (w *SyncWorker) syncAllAccounts(ctx context.Context) error {
rows, err := w.db.Query(ctx, `
SELECT id, imap_host, imap_port, imap_tls, credentials
FROM mail_accounts
WHERE is_active = true
`)
if err != nil {
return err
}
defer rows.Close()
hasSyncError := false
for rows.Next() {
var (
accountID string
host string
port int
useTLS bool
creds []byte
)
if err := rows.Scan(&accountID, &host, &port, &useTLS, &creds); err != nil {
w.logger.Error("failed to scan account", "error", err)
hasSyncError = true
continue
}
if err := w.syncAccount(ctx, accountID, host, port, useTLS, creds); err != nil {
w.logger.Error("sync failed", "account_id", accountID, "error", err)
hasSyncError = true
}
}
if err := rows.Err(); err != nil {
return err
}
if hasSyncError {
return errors.New("one or more account sync failed")
}
return nil
}
// SyncAccountForUser triggers an immediate IMAP sync for a single owned account.
func (w *SyncWorker) SyncAccountForUser(ctx context.Context, externalID, accountID string) error {
return w.syncAccountForUser(ctx, externalID, accountID, false)
}
// ForceSyncAccountForUser resets sync cursors then re-fetches every message
// from IMAP, re-applying sanitization. Existing rows are updated in place.
func (w *SyncWorker) ForceSyncAccountForUser(ctx context.Context, externalID, accountID string) error {
return w.syncAccountForUser(ctx, externalID, accountID, true)
}
func (w *SyncWorker) syncAccountForUser(ctx context.Context, externalID, accountID string, force bool) error {
var (
host string
port int
useTLS bool
creds []byte
)
err := w.db.QueryRow(ctx, `
SELECT ma.imap_host, ma.imap_port, ma.imap_tls, ma.credentials
FROM mail_accounts ma
JOIN users u ON ma.user_id = u.id
WHERE ma.id = $1 AND u.external_id = $2 AND ma.is_active = true
`, accountID, externalID).Scan(&host, &port, &useTLS, &creds)
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return fmt.Errorf("account not found")
}
return err
}
if force {
if err := resetAccountSyncCursors(ctx, w.db, accountID); err != nil {
return fmt.Errorf("reset sync cursors: %w", err)
}
w.logger.Info("force sync: reset cursors", "account_id", accountID)
}
return w.syncAccount(ctx, accountID, host, port, useTLS, creds)
}
func (w *SyncWorker) syncAccount(ctx context.Context, accountID, host string, port int, useTLS bool, creds []byte) error {
userID, err := w.accountUserID(ctx, accountID)
if err != nil {
return err
}
addr := fmt.Sprintf("%s:%d", host, port)
opts := &imapclient.Options{}
var client *imapclient.Client
if useTLS {
client, err = imapclient.DialTLS(addr, opts)
} else {
client, err = imapclient.DialStartTLS(addr, opts)
}
if err != nil {
return fmt.Errorf("dial: %w", err)
}
defer client.Close()
cred, err := w.resolveCredential(ctx, accountID, creds)
if err != nil {
return fmt.Errorf("decrypt credentials: %w", err)
}
if err := connect.AuthenticateIMAP(client, cred); err != nil {
return fmt.Errorf("login: %w", err)
}
listOpts := &imap.ListOptions{ReturnSpecialUse: true}
mailboxes, err := client.List("", "*", listOpts).Collect()
if err != nil {
return fmt.Errorf("list: %w", err)
}
for _, mbox := range mailboxes {
folderType := DetectFolderType(mbox.Mailbox, mbox.Attrs)
if folderType == "" {
continue
}
if err := w.syncFolder(ctx, client, accountID, userID, mbox.Mailbox, folderType); err != nil {
w.logger.Error("folder sync failed", "account_id", accountID, "folder", mbox.Mailbox, "error", err)
}
}
w.tagImportantFolderMessages(ctx, accountID)
_, err = w.db.Exec(ctx, `UPDATE mail_accounts SET last_sync_at = NOW() WHERE id = $1`, accountID)
return err
}
func (w *SyncWorker) accountUserID(ctx context.Context, accountID string) (string, error) {
var userID string
err := w.db.QueryRow(ctx, `SELECT user_id FROM mail_accounts WHERE id = $1`, accountID).Scan(&userID)
return userID, err
}
func (w *SyncWorker) syncFolder(ctx context.Context, client *imapclient.Client, accountID, userID, folderName, folderType string) error {
selectData, err := client.Select(folderName, &imap.SelectOptions{CondStore: true}).Wait()
if err != nil {
return fmt.Errorf("select %s: %w", folderName, err)
}
displayName := DisplayName(folderName, folderType)
prevState, hasPrev, err := loadFolderSyncState(ctx, w.db, accountID, folderName)
if err != nil && !errors.Is(err, pgx.ErrNoRows) {
return fmt.Errorf("load folder state: %w", err)
}
var folderID string
if hasPrev && prevState.UIDValidity != 0 && prevState.UIDValidity != selectData.UIDValidity {
if err := resetFolderMessages(ctx, w.db, prevState.FolderID); err != nil {
return fmt.Errorf("reset folder after uidvalidity change: %w", err)
}
prevState.LastUID = 0
prevState.HighestModSeq = 0
}
err = w.db.QueryRow(ctx, `
INSERT INTO mail_folders (account_id, name, remote_name, folder_type, uidvalidity, highest_modseq, message_count, last_uid)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT (account_id, remote_name) DO UPDATE
SET name = EXCLUDED.name,
folder_type = EXCLUDED.folder_type,
uidvalidity = EXCLUDED.uidvalidity,
highest_modseq = GREATEST(mail_folders.highest_modseq, EXCLUDED.highest_modseq),
message_count = EXCLUDED.message_count,
updated_at = NOW()
RETURNING id, last_uid, highest_modseq
`, accountID, displayName, folderName, folderType,
selectData.UIDValidity, selectData.HighestModSeq, selectData.NumMessages, prevState.LastUID,
).Scan(&folderID, &prevState.LastUID, &prevState.HighestModSeq)
if err != nil {
return fmt.Errorf("upsert folder: %w", err)
}
prevState.FolderID = folderID
prevState.UIDValidity = selectData.UIDValidity
if selectData.NumMessages == 0 {
return saveFolderSyncState(ctx, w.db, folderID, selectData.UIDValidity, selectData.HighestModSeq, 0, 0)
}
lastUID := prevState.LastUID
derivedLabels := FolderDerivedLabels(folderName)
if lastUID > 0 {
if err := w.fetchAndProcess(ctx, client, accountID, userID, folderID, lastUID+1, 0, false, derivedLabels); err != nil {
return err
}
} else {
if err := w.fetchAndProcess(ctx, client, accountID, userID, folderID, 1, 0, false, derivedLabels); err != nil {
return err
}
}
if selectData.HighestModSeq > 0 && prevState.HighestModSeq > 0 && selectData.HighestModSeq > prevState.HighestModSeq {
if err := w.fetchAndProcess(ctx, client, accountID, userID, folderID, 1, prevState.HighestModSeq, true, derivedLabels); err != nil {
w.logger.Warn("condstore incremental fetch failed", "folder", folderName, "error", err)
}
}
if err := w.reconcileDeletions(ctx, client, folderID, userID, accountID); err != nil {
w.logger.Warn("deletion reconcile failed", "folder", folderName, "error", err)
}
var maxUID uint32
_ = w.db.QueryRow(ctx, `SELECT COALESCE(MAX(uid), 0) FROM messages WHERE folder_id = $1`, folderID).Scan(&maxUID)
return saveFolderSyncState(ctx, w.db, folderID, selectData.UIDValidity, selectData.HighestModSeq, maxUID, int(selectData.NumMessages))
}
func (w *SyncWorker) fetchAndProcess(ctx context.Context, client *imapclient.Client, accountID, userID, folderID string, fromUID uint32, changedSince uint64, updatesOnly bool, derivedLabels []string) error {
seqSet := imap.UIDSet{}
seqSet.AddRange(imap.UID(fromUID), imap.UID(0))
fetchOpts := &imap.FetchOptions{
UID: true,
Flags: true,
Envelope: true,
ModSeq: changedSince > 0,
BodySection: []*imap.FetchItemBodySection{{}},
}
if changedSince > 0 {
fetchOpts.ChangedSince = changedSince
}
fetchCmd := client.Fetch(seqSet, fetchOpts)
for {
msg := fetchCmd.Next()
if msg == nil {
break
}
kind, messageID, err := w.processMessage(ctx, msg, accountID, userID, folderID, updatesOnly, derivedLabels)
if err != nil {
w.logger.Error("process message failed", "folder_id", folderID, "error", err)
continue
}
if kind != "" && w.pipeline != nil {
w.pipeline.handle(ctx, postSyncEvent{
userID: userID, accountID: accountID, messageID: messageID, kind: kind,
})
}
}
return fetchCmd.Close()
}
func (w *SyncWorker) reconcileDeletions(ctx context.Context, client *imapclient.Client, folderID, userID, accountID string) error {
searchData, err := client.UIDSearch(&imap.SearchCriteria{}, &imap.SearchOptions{ReturnAll: true}).Wait()
if err != nil {
return err
}
remoteUIDs := uidSetToMap(searchData.All)
if len(remoteUIDs) == 0 && searchData.Count == 0 {
return nil
}
rows, err := w.db.Query(ctx, `SELECT id, uid FROM messages WHERE folder_id = $1`, folderID)
if err != nil {
return err
}
defer rows.Close()
for rows.Next() {
var messageID string
var uid uint32
if err := rows.Scan(&messageID, &uid); err != nil {
return err
}
if !remoteUIDs[uid] {
if _, err := w.db.Exec(ctx, `DELETE FROM messages WHERE id = $1`, messageID); err != nil {
return err
}
if w.pipeline != nil {
w.pipeline.handle(ctx, postSyncEvent{
userID: userID, accountID: accountID, messageID: messageID, kind: "deleted",
})
}
}
}
return rows.Err()
}
func uidSetToMap(set imap.NumSet) map[uint32]bool {
out := make(map[uint32]bool)
if set == nil {
return out
}
uidSet, ok := set.(imap.UIDSet)
if !ok {
return out
}
uids, ok := uidSet.Nums()
if !ok {
return out
}
for _, uid := range uids {
out[uint32(uid)] = true
}
return out
}
func (w *SyncWorker) processMessage(ctx context.Context, msg *imapclient.FetchMessageData, accountID, userID, folderID string, updatesOnly bool, derivedLabels []string) (kind, messageID string, err error) {
var envelope *imap.Envelope
var uid imap.UID
var flags []imap.Flag
var bodyContent []byte
for {
item := msg.Next()
if item == nil {
break
}
switch data := item.(type) {
case imapclient.FetchItemDataUID:
uid = data.UID
case imapclient.FetchItemDataFlags:
flags = data.Flags
case imapclient.FetchItemDataEnvelope:
envelope = data.Envelope
case imapclient.FetchItemDataBodySection:
if data.Literal == nil {
break
}
buf := make([]byte, 0, 4096)
b := make([]byte, 4096)
for {
n, readErr := data.Literal.Read(b)
buf = append(buf, b[:n]...)
if readErr != nil {
break
}
}
bodyContent = buf
}
}
if envelope == nil || uid == 0 {
return "", "", nil
}
flagStrs := flagsToStrings(flags)
fromList := envelope.From
if len(fromList) == 0 {
fromList = envelope.Sender
}
fromAddr := addressesToJSON(fromList)
if len(fromList) == 0 {
if hdrFrom := parseFromHeader(bodyContent); len(hdrFrom) > 0 {
b, _ := json.Marshal(hdrFrom)
fromAddr = b
}
}
if isEmptyFromJSON(fromAddr) {
var folderType string
_ = w.db.QueryRow(ctx, `SELECT folder_type FROM mail_folders WHERE id = $1`, folderID).Scan(&folderType)
if folderType == "sent" {
if acctFrom, err := w.accountFromJSON(ctx, accountID); err == nil && len(acctFrom) > 0 {
fromAddr = acctFrom
}
}
}
toAddrs := addressesToJSON(envelope.To)
ccAddrs := addressesToJSON(envelope.Cc)
bodyText, bodyHTML := parseBody(bodyContent)
snippet := SnippetFromBodies(bodyText, bodyHTML, 200)
headerRefs, headerInReplyTo := parseThreadHeaders(bodyContent)
inReplyTo := headerInReplyTo
if inReplyTo == "" && len(envelope.InReplyTo) > 0 {
inReplyTo = threading.NormalizeMessageID(envelope.InReplyTo[0])
}
references := headerRefs
if references == nil {
references = []string{}
}
rfcMessageID := threading.NormalizeMessageID(envelope.MessageID)
replyToJSON, authJSON := parseMessageMeta(bodyContent, envelope)
subject := RepairSubject(envelope.Subject, bodyText, bodyHTML, bodyContent)
snippet = toValidUTF8(snippet)
bodyText = toValidUTF8(bodyText)
bodyHTML = toValidUTF8(sanitize.SanitizeHTML(bodyHTML))
var existed bool
_ = w.db.QueryRow(ctx, `
SELECT EXISTS(SELECT 1 FROM messages WHERE folder_id = $1 AND uid = $2)
`, folderID, uid).Scan(&existed)
err = w.db.QueryRow(ctx, `
INSERT INTO messages (account_id, folder_id, uid, message_id, subject, from_addr, to_addrs, cc_addrs, reply_to, auth_info, date, snippet, body_text, body_html, flags, in_reply_to, references_header)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17)
ON CONFLICT (folder_id, uid) DO UPDATE SET
message_id = EXCLUDED.message_id,
subject = EXCLUDED.subject,
from_addr = EXCLUDED.from_addr,
to_addrs = EXCLUDED.to_addrs,
cc_addrs = EXCLUDED.cc_addrs,
reply_to = EXCLUDED.reply_to,
auth_info = EXCLUDED.auth_info,
date = EXCLUDED.date,
snippet = EXCLUDED.snippet,
body_text = EXCLUDED.body_text,
body_html = EXCLUDED.body_html,
flags = EXCLUDED.flags,
in_reply_to = EXCLUDED.in_reply_to,
references_header = EXCLUDED.references_header,
updated_at = NOW()
RETURNING id
`, accountID, folderID, uid, rfcMessageID, subject,
fromAddr, toAddrs, ccAddrs, replyToJSON, authJSON, envelope.Date, snippet, bodyText, bodyHTML, flagStrs, inReplyTo, references,
).Scan(&messageID)
if err != nil {
return "", "", err
}
if err := threading.ApplyMessageThread(ctx, w.db, accountID, messageID, rfcMessageID, inReplyTo, references); err != nil {
return "", "", err
}
if len(derivedLabels) > 0 {
if _, err := w.db.Exec(ctx, `
UPDATE messages
SET labels = (
SELECT COALESCE(array_agg(DISTINCT elem), '{}')
FROM unnest(COALESCE(labels, '{}') || $1::text[]) AS elem
),
updated_at = NOW()
WHERE id = $2
`, derivedLabels, messageID); err != nil {
return "", "", err
}
}
if err := w.storeAttachments(ctx, userID, messageID, bodyContent, existed); err != nil {
w.logger.Warn("attachment store failed", "message_id", messageID, "error", err)
}
if existed {
return "updated", messageID, nil
}
if updatesOnly {
return "updated", messageID, nil
}
return "created", messageID, nil
}
func (w *SyncWorker) tagImportantFolderMessages(ctx context.Context, accountID string) {
_, err := w.db.Exec(ctx, `
UPDATE messages m
SET labels = (
SELECT COALESCE(array_agg(DISTINCT elem), '{}')
FROM unnest(COALESCE(m.labels, '{}') || ARRAY['important']) AS elem
),
updated_at = NOW()
FROM mail_folders mf
WHERE m.folder_id = mf.id
AND m.account_id = $1
AND LOWER(mf.name) = 'important'
AND NOT (COALESCE(m.labels, '{}') @> ARRAY['important'])
`, accountID)
if err != nil {
w.logger.Warn("tag important folder messages failed", "account_id", accountID, "error", err)
}
}
func (w *SyncWorker) storeAttachments(ctx context.Context, userID, messageID string, raw []byte, messageExisted bool) error {
if w.storage == nil || len(raw) == 0 {
return nil
}
parts, err := ExtractAttachments(raw)
if err != nil || len(parts) == 0 {
return err
}
bucket := w.attachBucket
if bucket == "" {
bucket = "mail-attachments"
}
for _, part := range parts {
if err := limits.ValidateAttachmentSize(int64(len(part.Data))); err != nil {
continue
}
if messageExisted && attachmentPartExists(ctx, w.db, messageID, part) {
continue
}
scanStatus := "skipped"
partData := part.Data
if w.scanner != nil {
result, err := w.scanner.ScanBytes(ctx, part.Filename, part.Data)
if err != nil {
if errors.Is(err, filescan.ErrMalicious) {
w.logger.Warn("imap attachment skipped: malware detected",
"message_id", messageID, "filename", part.Filename)
continue
}
return err
}
scanStatus = result.Status
}
objectKey := storage.MessageObjectKey(userID, messageID, part.Filename)
if err := w.storage.Put(ctx, objectKey, bytes.NewReader(partData), int64(len(partData)), part.ContentType); err != nil {
return err
}
_, err := w.db.Exec(ctx, `
INSERT INTO attachments (message_id, filename, content_type, size, s3_bucket, s3_key, content_id, is_inline, virus_scan_status)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
`, messageID, part.Filename, part.ContentType, len(partData), bucket, objectKey, part.ContentID, part.IsInline, scanStatus)
if err != nil {
_ = w.storage.Delete(ctx, objectKey)
return err
}
}
_, err = w.db.Exec(ctx, `UPDATE messages SET has_attachments = true, updated_at = NOW() WHERE id = $1`, messageID)
return err
}
func attachmentPartExists(ctx context.Context, db *pgxpool.Pool, messageID string, part AttachmentPart) bool {
var count int
if part.ContentID != "" {
_ = db.QueryRow(ctx, `
SELECT COUNT(*) FROM attachments
WHERE message_id = $1 AND (content_id = $2 OR filename = $3)
`, messageID, part.ContentID, part.Filename).Scan(&count)
return count > 0
}
_ = db.QueryRow(ctx, `
SELECT COUNT(*) FROM attachments WHERE message_id = $1 AND filename = $2
`, messageID, part.Filename).Scan(&count)
return count > 0
}
// ReindexMessageAttachments fetches one message from IMAP and stores missing inline parts.
func (w *SyncWorker) ReindexMessageAttachments(ctx context.Context, externalID, messageID string) error {
if w.storage == nil {
return errors.New("object storage unavailable")
}
var accountID, userID, folderName string
var uid uint32
err := w.db.QueryRow(ctx, `
SELECT m.account_id, ma.user_id, mf.remote_name, m.uid
FROM messages m
JOIN mail_accounts ma ON m.account_id = ma.id
JOIN users u ON ma.user_id = u.id
JOIN mail_folders mf ON m.folder_id = mf.id
WHERE m.id = $1 AND u.external_id = $2 AND ma.is_active = true
`, messageID, externalID).Scan(&accountID, &userID, &folderName, &uid)
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return fmt.Errorf("message not found")
}
return err
}
if uid == 0 {
return fmt.Errorf("message has no IMAP uid")
}
var host string
var port int
var useTLS bool
var creds []byte
err = w.db.QueryRow(ctx, `SELECT imap_host, imap_port, imap_tls, credentials FROM mail_accounts WHERE id = $1`, accountID).
Scan(&host, &port, &useTLS, &creds)
if err != nil {
return err
}
addr := fmt.Sprintf("%s:%d", host, port)
var client *imapclient.Client
if useTLS {
client, err = imapclient.DialTLS(addr, &imapclient.Options{})
} else {
client, err = imapclient.DialStartTLS(addr, &imapclient.Options{})
}
if err != nil {
return fmt.Errorf("dial: %w", err)
}
defer client.Close()
cred, err := w.resolveCredential(ctx, accountID, creds)
if err != nil {
return err
}
if err := connect.AuthenticateIMAP(client, cred); err != nil {
return fmt.Errorf("login: %w", err)
}
if _, err := client.Select(folderName, nil).Wait(); err != nil {
return fmt.Errorf("select: %w", err)
}
bodyContent, err := fetchIMAPMessageRawBody(client, uid)
if err != nil {
return err
}
return w.storeAttachments(ctx, userID, messageID, bodyContent, true)
}
type RefetchBodiesResult struct {
Scanned int `json:"scanned"`
Updated int `json:"updated"`
}
const refetchBodiesBatchSize = 100
// RefetchAccountBodiesForUser re-downloads raw MIME from IMAP and re-parses bodies
// (text/html + inline attachments). Fixes messages imported with outdated sanitization.
func (w *SyncWorker) RefetchAccountBodiesForUser(ctx context.Context, externalID, accountID string) (RefetchBodiesResult, error) {
var result RefetchBodiesResult
if w.storage == nil {
return result, errors.New("object storage unavailable")
}
var host string
var port int
var useTLS bool
var creds []byte
err := w.db.QueryRow(ctx, `
SELECT ma.imap_host, ma.imap_port, ma.imap_tls, ma.credentials
FROM mail_accounts ma
JOIN users u ON ma.user_id = u.id
WHERE ma.id = $1 AND u.external_id = $2 AND ma.is_active = true
`, accountID, externalID).Scan(&host, &port, &useTLS, &creds)
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return result, fmt.Errorf("account not found")
}
return result, err
}
type messageRef struct {
id string
userID string
folderName string
uid uint32
}
var lastID string
for {
rows, err := w.db.Query(ctx, `
SELECT m.id, ma.user_id, mf.remote_name, m.uid
FROM messages m
JOIN mail_accounts ma ON m.account_id = ma.id
JOIN mail_folders mf ON m.folder_id = mf.id
WHERE m.account_id = $1
AND m.uid > 0
AND ($2 = '' OR m.id > $2::uuid)
ORDER BY m.id
LIMIT $3
`, accountID, lastID, refetchBodiesBatchSize)
if err != nil {
return result, err
}
batch := make([]messageRef, 0, refetchBodiesBatchSize)
for rows.Next() {
var ref messageRef
if err := rows.Scan(&ref.id, &ref.userID, &ref.folderName, &ref.uid); err != nil {
rows.Close()
return result, err
}
batch = append(batch, ref)
lastID = ref.id
}
if err := rows.Err(); err != nil {
rows.Close()
return result, err
}
rows.Close()
if len(batch) == 0 {
break
}
byFolder := make(map[string][]messageRef)
for _, ref := range batch {
byFolder[ref.folderName] = append(byFolder[ref.folderName], ref)
}
addr := fmt.Sprintf("%s:%d", host, port)
var client *imapclient.Client
if useTLS {
client, err = imapclient.DialTLS(addr, &imapclient.Options{})
} else {
client, err = imapclient.DialStartTLS(addr, &imapclient.Options{})
}
if err != nil {
return result, fmt.Errorf("dial: %w", err)
}
cred, err := w.resolveCredential(ctx, accountID, creds)
if err != nil {
client.Close()
return result, err
}
if err := connect.AuthenticateIMAP(client, cred); err != nil {
client.Close()
return result, fmt.Errorf("login: %w", err)
}
for folderName, refs := range byFolder {
if _, err := client.Select(folderName, nil).Wait(); err != nil {
w.logger.Warn("refetch bodies: select folder failed", "folder", folderName, "error", err)
result.Scanned += len(refs)
continue
}
for _, ref := range refs {
result.Scanned++
updated, err := w.refetchStoredMessageBody(ctx, client, ref.userID, ref.id, ref.uid)
if err != nil {
w.logger.Warn("refetch message body failed", "message_id", ref.id, "error", err)
continue
}
if updated {
result.Updated++
}
}
}
client.Close()
if len(batch) < refetchBodiesBatchSize {
break
}
}
return result, nil
}
func (w *SyncWorker) refetchStoredMessageBody(ctx context.Context, client *imapclient.Client, userID, messageID string, uid uint32) (bool, error) {
bodyContent, err := fetchIMAPMessageRawBody(client, uid)
if err != nil {
return false, err
}
if len(bodyContent) == 0 {
return false, nil
}
bodyText, bodyHTML := parseBody(bodyContent)
bodyText = toValidUTF8(bodyText)
bodyHTML = toValidUTF8(sanitize.SanitizeHTML(bodyHTML))
snippet := SnippetFromBodies(bodyText, bodyHTML, 200)
tag, err := w.db.Exec(ctx, `
UPDATE messages
SET body_text = $2,
body_html = $3,
snippet = CASE WHEN $4 <> '' THEN $4 ELSE snippet END,
updated_at = NOW()
WHERE id = $1
AND (body_text IS DISTINCT FROM $2 OR body_html IS DISTINCT FROM $3)
`, messageID, bodyText, bodyHTML, snippet)
if err != nil {
return false, err
}
if err := w.storeAttachments(ctx, userID, messageID, bodyContent, true); err != nil {
return false, err
}
return tag.RowsAffected() > 0, nil
}
func fetchIMAPMessageRawBody(client *imapclient.Client, uid uint32) ([]byte, error) {
seqSet := imap.UIDSet{}
seqSet.AddNum(imap.UID(uid))
fetchCmd := client.Fetch(seqSet, &imap.FetchOptions{
UID: true,
BodySection: []*imap.FetchItemBodySection{{}},
})
msg := fetchCmd.Next()
if msg == nil {
return nil, fmt.Errorf("message not found on server")
}
var bodyContent []byte
for {
item := msg.Next()
if item == nil {
break
}
if data, ok := item.(imapclient.FetchItemDataBodySection); ok && data.Literal != nil {
buf := make([]byte, 0, 4096)
b := make([]byte, 4096)
for {
n, readErr := data.Literal.Read(b)
buf = append(buf, b[:n]...)
if readErr != nil {
break
}
}
bodyContent = buf
}
}
return bodyContent, nil
}
func isEmptyFromJSON(fromAddr []byte) bool {
if len(fromAddr) == 0 || string(fromAddr) == "[]" || string(fromAddr) == "null" {
return true
}
var addrs []EmailAddress
if err := json.Unmarshal(fromAddr, &addrs); err != nil {
return true
}
for _, a := range addrs {
if strings.TrimSpace(a.Address) != "" || strings.TrimSpace(a.Name) != "" {
return false
}
}
return true
}
func (w *SyncWorker) accountFromJSON(ctx context.Context, accountID string) ([]byte, error) {
var email, name string
err := w.db.QueryRow(ctx, `SELECT email, name FROM mail_accounts WHERE id = $1`, accountID).Scan(&email, &name)
if err != nil {
return nil, err
}
if strings.TrimSpace(email) == "" {
return nil, nil
}
return json.Marshal([]EmailAddress{{Name: name, Address: email}})
}
func flagsToStrings(flags []imap.Flag) []string {
out := make([]string, len(flags))
for i, f := range flags {
out[i] = string(f)
}
return out
}
func (w *SyncWorker) resolveCredential(ctx context.Context, accountID string, creds []byte) (credentials.Credential, error) {
if len(creds) == 0 {
return credentials.Credential{}, errors.New("missing credentials")
}
if !credentials.IsEncrypted(creds) {
return credentials.Credential{}, errors.New("plaintext credentials forbidden")
}
if w.credentials == nil {
return credentials.Credential{}, errors.New("credential manager not configured")
}
cred, err := w.credentials.DecryptCredential(creds)
if err != nil {
return credentials.Credential{}, err
}
if w.oauth != nil && cred.IsOAuth() {
return mailoauth.RefreshAccountCredential(ctx, w.db, w.credentials, w.oauth, accountID, cred)
}
return cred, nil
}
func truncate(s string, maxLen int) string {
if len(s) <= maxLen {
return s
}
return s[:maxLen]
}