ultisuite-backend/internal/mail/imap/sync.go
R3D347HR4Y bb5be669c1 Implement IMAP sync pipeline with rules and webhook support
- Introduced a new sync pipeline for IMAP that integrates a rules engine and webhook execution.
- Enhanced the `SyncWorker` to support attachment management and folder synchronization.
- Added functionality to detect special folder types (Sent, Drafts, Trash, Archive, Spam) during sync.
- Implemented a database schema for tracking rule executions and their outcomes.
- Created unit tests for the new rules engine and webhook execution logic.
- Updated migration scripts to accommodate new database structures for rule executions and folder states.
- Enhanced error handling and logging throughout the sync process for better observability.
2026-05-22 17:38:39 +02:00

516 lines
15 KiB
Go

package imap
import (
"bytes"
"context"
"errors"
"fmt"
"log/slog"
"strings"
"time"
"github.com/emersion/go-imap/v2"
"github.com/emersion/go-imap/v2/imapclient"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/ultisuite/ulti-backend/internal/mail/credentials"
"github.com/ultisuite/ulti-backend/internal/mail/limits"
"github.com/ultisuite/ulti-backend/internal/mail/rules"
"github.com/ultisuite/ulti-backend/internal/mail/storage"
"github.com/ultisuite/ulti-backend/internal/mail/threading"
"github.com/ultisuite/ulti-backend/internal/observability"
"github.com/ultisuite/ulti-backend/internal/realtime"
)
// SyncDeps optional services wired into the IMAP sync worker.
type SyncDeps struct {
Storage *storage.Client
AttachBucket string
Rules *rules.Engine
Hub *realtime.Hub
}
type SyncWorker struct {
db *pgxpool.Pool
logger *slog.Logger
interval time.Duration
credentials *credentials.Manager
storage *storage.Client
attachBucket string
pipeline *syncPipeline
}
func NewSyncWorker(db *pgxpool.Pool, interval time.Duration, credManager *credentials.Manager, deps SyncDeps) *SyncWorker {
return &SyncWorker{
db: db,
logger: slog.Default().With("component", "imap-sync"),
interval: interval,
credentials: credManager,
storage: deps.Storage,
attachBucket: deps.AttachBucket,
pipeline: newSyncPipeline(db, deps.Rules, deps.Hub),
}
}
func (w *SyncWorker) Start(ctx context.Context) {
ticker := time.NewTicker(w.interval)
defer ticker.Stop()
w.logger.Info("imap sync worker started", "interval", w.interval)
w.runSyncCycle(ctx)
for {
select {
case <-ctx.Done():
w.logger.Info("imap sync worker stopped")
return
case <-ticker.C:
w.runSyncCycle(ctx)
}
}
}
func (w *SyncWorker) runSyncCycle(ctx context.Context) {
start := time.Now()
if err := w.syncAllAccounts(ctx); err != nil {
observability.ObserveIMAPSync("error", time.Since(start))
w.logger.Error("sync cycle failed", "error", err)
return
}
observability.ObserveIMAPSync("success", time.Since(start))
}
func (w *SyncWorker) syncAllAccounts(ctx context.Context) error {
rows, err := w.db.Query(ctx, `
SELECT id, imap_host, imap_port, imap_tls, credentials
FROM mail_accounts
WHERE is_active = true
`)
if err != nil {
return err
}
defer rows.Close()
hasSyncError := false
for rows.Next() {
var (
accountID string
host string
port int
useTLS bool
creds []byte
)
if err := rows.Scan(&accountID, &host, &port, &useTLS, &creds); err != nil {
w.logger.Error("failed to scan account", "error", err)
hasSyncError = true
continue
}
if err := w.syncAccount(ctx, accountID, host, port, useTLS, creds); err != nil {
w.logger.Error("sync failed", "account_id", accountID, "error", err)
hasSyncError = true
}
}
if err := rows.Err(); err != nil {
return err
}
if hasSyncError {
return errors.New("one or more account sync failed")
}
return nil
}
func (w *SyncWorker) syncAccount(ctx context.Context, accountID, host string, port int, useTLS bool, creds []byte) error {
userID, err := w.accountUserID(ctx, accountID)
if err != nil {
return err
}
addr := fmt.Sprintf("%s:%d", host, port)
opts := &imapclient.Options{}
var client *imapclient.Client
if useTLS {
client, err = imapclient.DialTLS(addr, opts)
} else {
client, err = imapclient.DialStartTLS(addr, opts)
}
if err != nil {
return fmt.Errorf("dial: %w", err)
}
defer client.Close()
username, password, err := w.parseCredentials(creds)
if err != nil {
return fmt.Errorf("decrypt credentials: %w", err)
}
if err := client.Login(username, password).Wait(); err != nil {
return fmt.Errorf("login: %w", err)
}
listOpts := &imap.ListOptions{ReturnSpecialUse: true}
mailboxes, err := client.List("", "*", listOpts).Collect()
if err != nil {
return fmt.Errorf("list: %w", err)
}
for _, mbox := range mailboxes {
folderType := DetectFolderType(mbox.Mailbox, mbox.Attrs)
if folderType == "" {
continue
}
if err := w.syncFolder(ctx, client, accountID, userID, mbox.Mailbox, folderType); err != nil {
w.logger.Error("folder sync failed", "account_id", accountID, "folder", mbox.Mailbox, "error", err)
}
}
_, err = w.db.Exec(ctx, `UPDATE mail_accounts SET last_sync_at = NOW() WHERE id = $1`, accountID)
return err
}
func (w *SyncWorker) accountUserID(ctx context.Context, accountID string) (string, error) {
var userID string
err := w.db.QueryRow(ctx, `SELECT user_id FROM mail_accounts WHERE id = $1`, accountID).Scan(&userID)
return userID, err
}
func (w *SyncWorker) syncFolder(ctx context.Context, client *imapclient.Client, accountID, userID, folderName, folderType string) error {
selectData, err := client.Select(folderName, &imap.SelectOptions{CondStore: true}).Wait()
if err != nil {
return fmt.Errorf("select %s: %w", folderName, err)
}
displayName := DisplayName(folderName, folderType)
prevState, hasPrev, err := loadFolderSyncState(ctx, w.db, accountID, folderName)
if err != nil && !errors.Is(err, pgx.ErrNoRows) {
return fmt.Errorf("load folder state: %w", err)
}
var folderID string
if hasPrev && prevState.UIDValidity != 0 && prevState.UIDValidity != selectData.UIDValidity {
if err := resetFolderMessages(ctx, w.db, prevState.FolderID); err != nil {
return fmt.Errorf("reset folder after uidvalidity change: %w", err)
}
prevState.LastUID = 0
prevState.HighestModSeq = 0
}
err = w.db.QueryRow(ctx, `
INSERT INTO mail_folders (account_id, name, remote_name, folder_type, uidvalidity, highest_modseq, message_count, last_uid)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT (account_id, remote_name) DO UPDATE
SET name = EXCLUDED.name,
folder_type = EXCLUDED.folder_type,
uidvalidity = EXCLUDED.uidvalidity,
highest_modseq = GREATEST(mail_folders.highest_modseq, EXCLUDED.highest_modseq),
message_count = EXCLUDED.message_count,
updated_at = NOW()
RETURNING id, last_uid, highest_modseq
`, accountID, displayName, folderName, folderType,
selectData.UIDValidity, selectData.HighestModSeq, selectData.NumMessages, prevState.LastUID,
).Scan(&folderID, &prevState.LastUID, &prevState.HighestModSeq)
if err != nil {
return fmt.Errorf("upsert folder: %w", err)
}
prevState.FolderID = folderID
prevState.UIDValidity = selectData.UIDValidity
if selectData.NumMessages == 0 {
return saveFolderSyncState(ctx, w.db, folderID, selectData.UIDValidity, selectData.HighestModSeq, 0, 0)
}
lastUID := prevState.LastUID
if lastUID > 0 {
if err := w.fetchAndProcess(ctx, client, accountID, userID, folderID, lastUID+1, 0, false); err != nil {
return err
}
} else {
if err := w.fetchAndProcess(ctx, client, accountID, userID, folderID, 1, 0, false); err != nil {
return err
}
}
if selectData.HighestModSeq > 0 && prevState.HighestModSeq > 0 && selectData.HighestModSeq > prevState.HighestModSeq {
if err := w.fetchAndProcess(ctx, client, accountID, userID, folderID, 1, prevState.HighestModSeq, true); err != nil {
w.logger.Warn("condstore incremental fetch failed", "folder", folderName, "error", err)
}
}
if err := w.reconcileDeletions(ctx, client, folderID, userID, accountID); err != nil {
w.logger.Warn("deletion reconcile failed", "folder", folderName, "error", err)
}
var maxUID uint32
_ = w.db.QueryRow(ctx, `SELECT COALESCE(MAX(uid), 0) FROM messages WHERE folder_id = $1`, folderID).Scan(&maxUID)
return saveFolderSyncState(ctx, w.db, folderID, selectData.UIDValidity, selectData.HighestModSeq, maxUID, int(selectData.NumMessages))
}
func (w *SyncWorker) fetchAndProcess(ctx context.Context, client *imapclient.Client, accountID, userID, folderID string, fromUID uint32, changedSince uint64, updatesOnly bool) error {
seqSet := imap.UIDSet{}
seqSet.AddRange(imap.UID(fromUID), imap.UID(0))
fetchOpts := &imap.FetchOptions{
UID: true,
Flags: true,
Envelope: true,
ModSeq: changedSince > 0,
BodySection: []*imap.FetchItemBodySection{{}},
}
if changedSince > 0 {
fetchOpts.ChangedSince = changedSince
}
fetchCmd := client.Fetch(seqSet, fetchOpts)
for {
msg := fetchCmd.Next()
if msg == nil {
break
}
kind, messageID, err := w.processMessage(ctx, msg, accountID, userID, folderID, updatesOnly)
if err != nil {
w.logger.Error("process message failed", "folder_id", folderID, "error", err)
continue
}
if kind != "" && w.pipeline != nil {
w.pipeline.handle(ctx, postSyncEvent{
userID: userID, accountID: accountID, messageID: messageID, kind: kind,
})
}
}
return fetchCmd.Close()
}
func (w *SyncWorker) reconcileDeletions(ctx context.Context, client *imapclient.Client, folderID, userID, accountID string) error {
searchData, err := client.UIDSearch(&imap.SearchCriteria{}, &imap.SearchOptions{ReturnAll: true}).Wait()
if err != nil {
return err
}
remoteUIDs := uidSetToMap(searchData.All)
if len(remoteUIDs) == 0 && searchData.Count == 0 {
return nil
}
rows, err := w.db.Query(ctx, `SELECT id, uid FROM messages WHERE folder_id = $1`, folderID)
if err != nil {
return err
}
defer rows.Close()
for rows.Next() {
var messageID string
var uid uint32
if err := rows.Scan(&messageID, &uid); err != nil {
return err
}
if !remoteUIDs[uid] {
if _, err := w.db.Exec(ctx, `DELETE FROM messages WHERE id = $1`, messageID); err != nil {
return err
}
if w.pipeline != nil {
w.pipeline.handle(ctx, postSyncEvent{
userID: userID, accountID: accountID, messageID: messageID, kind: "deleted",
})
}
}
}
return rows.Err()
}
func uidSetToMap(set imap.NumSet) map[uint32]bool {
out := make(map[uint32]bool)
if set == nil {
return out
}
uidSet, ok := set.(imap.UIDSet)
if !ok {
return out
}
uids, ok := uidSet.Nums()
if !ok {
return out
}
for _, uid := range uids {
out[uint32(uid)] = true
}
return out
}
func (w *SyncWorker) processMessage(ctx context.Context, msg *imapclient.FetchMessageData, accountID, userID, folderID string, updatesOnly bool) (kind, messageID string, err error) {
var envelope *imap.Envelope
var uid imap.UID
var flags []imap.Flag
var bodyContent []byte
for {
item := msg.Next()
if item == nil {
break
}
switch data := item.(type) {
case imapclient.FetchItemDataUID:
uid = data.UID
case imapclient.FetchItemDataFlags:
flags = data.Flags
case imapclient.FetchItemDataEnvelope:
envelope = data.Envelope
case imapclient.FetchItemDataBodySection:
if data.Literal == nil {
break
}
buf := make([]byte, 0, 4096)
b := make([]byte, 4096)
for {
n, readErr := data.Literal.Read(b)
buf = append(buf, b[:n]...)
if readErr != nil {
break
}
}
bodyContent = buf
}
}
if envelope == nil || uid == 0 {
return "", "", nil
}
flagStrs := flagsToStrings(flags)
fromAddr := addressesToJSON(envelope.From)
toAddrs := addressesToJSON(envelope.To)
ccAddrs := addressesToJSON(envelope.Cc)
bodyText, bodyHTML := parseBody(bodyContent)
snippet := truncate(bodyText, 200)
headerRefs, headerInReplyTo := parseThreadHeaders(bodyContent)
inReplyTo := headerInReplyTo
if inReplyTo == "" && len(envelope.InReplyTo) > 0 {
inReplyTo = threading.NormalizeMessageID(envelope.InReplyTo[0])
}
references := headerRefs
if len(references) == 0 {
references = threading.ParseMessageIDs(strings.Join(envelope.InReplyTo, " "))
}
var existed bool
_ = w.db.QueryRow(ctx, `
SELECT EXISTS(SELECT 1 FROM messages WHERE folder_id = $1 AND uid = $2)
`, folderID, uid).Scan(&existed)
err = w.db.QueryRow(ctx, `
INSERT INTO messages (account_id, folder_id, uid, message_id, subject, from_addr, to_addrs, cc_addrs, date, snippet, body_text, body_html, flags, in_reply_to, references_header)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15)
ON CONFLICT (folder_id, uid) DO UPDATE SET
message_id = EXCLUDED.message_id,
subject = EXCLUDED.subject,
from_addr = EXCLUDED.from_addr,
to_addrs = EXCLUDED.to_addrs,
cc_addrs = EXCLUDED.cc_addrs,
date = EXCLUDED.date,
snippet = EXCLUDED.snippet,
body_text = EXCLUDED.body_text,
body_html = EXCLUDED.body_html,
flags = EXCLUDED.flags,
in_reply_to = EXCLUDED.in_reply_to,
references_header = EXCLUDED.references_header,
updated_at = NOW()
RETURNING id
`, accountID, folderID, uid, envelope.MessageID, envelope.Subject,
fromAddr, toAddrs, ccAddrs, envelope.Date, snippet, bodyText, bodyHTML, flagStrs, inReplyTo, references,
).Scan(&messageID)
if err != nil {
return "", "", err
}
threadID, err := threading.AssignThreadID(ctx, w.db, accountID, inReplyTo, references)
if err != nil {
return "", "", err
}
_, err = w.db.Exec(ctx, `UPDATE messages SET thread_id = $1, updated_at = NOW() WHERE id = $2`, threadID, messageID)
if err != nil {
return "", "", err
}
if err := w.storeAttachments(ctx, userID, messageID, bodyContent, existed); err != nil {
w.logger.Warn("attachment store failed", "message_id", messageID, "error", err)
}
if existed {
return "updated", messageID, nil
}
if updatesOnly {
return "updated", messageID, nil
}
return "created", messageID, nil
}
func (w *SyncWorker) storeAttachments(ctx context.Context, userID, messageID string, raw []byte, messageExisted bool) error {
if w.storage == nil || len(raw) == 0 {
return nil
}
if messageExisted {
var attCount int
_ = w.db.QueryRow(ctx, `SELECT COUNT(*) FROM attachments WHERE message_id = $1`, messageID).Scan(&attCount)
if attCount > 0 {
return nil
}
}
parts, err := ExtractAttachments(raw)
if err != nil || len(parts) == 0 {
return err
}
bucket := w.attachBucket
if bucket == "" {
bucket = "mail-attachments"
}
for _, part := range parts {
if err := limits.ValidateAttachmentSize(int64(len(part.Data))); err != nil {
continue
}
objectKey := storage.MessageObjectKey(userID, messageID, part.Filename)
if err := w.storage.Put(ctx, objectKey, bytes.NewReader(part.Data), int64(len(part.Data)), part.ContentType); err != nil {
return err
}
_, err := w.db.Exec(ctx, `
INSERT INTO attachments (message_id, filename, content_type, size, s3_bucket, s3_key, content_id, is_inline)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
`, messageID, part.Filename, part.ContentType, len(part.Data), bucket, objectKey, part.ContentID, part.IsInline)
if err != nil {
_ = w.storage.Delete(ctx, objectKey)
return err
}
}
_, err = w.db.Exec(ctx, `UPDATE messages SET has_attachments = true, updated_at = NOW() WHERE id = $1`, messageID)
return err
}
func flagsToStrings(flags []imap.Flag) []string {
out := make([]string, len(flags))
for i, f := range flags {
out[i] = string(f)
}
return out
}
func (w *SyncWorker) parseCredentials(creds []byte) (string, string, error) {
if len(creds) == 0 {
return "", "", errors.New("missing credentials")
}
if !credentials.IsEncrypted(creds) {
return "", "", errors.New("plaintext credentials forbidden")
}
if w.credentials == nil {
return "", "", errors.New("credential manager not configured")
}
return w.credentials.Decrypt(creds)
}
func truncate(s string, maxLen int) string {
if len(s) <= maxLen {
return s
}
return s[:maxLen]
}