Implement IMAP sync pipeline with rules and webhook support

- Introduced a new sync pipeline for IMAP that integrates a rules engine and webhook execution.
- Enhanced the `SyncWorker` to support attachment management and folder synchronization.
- Added functionality to detect special folder types (Sent, Drafts, Trash, Archive, Spam) during sync.
- Implemented a database schema for tracking rule executions and their outcomes.
- Created unit tests for the new rules engine and webhook execution logic.
- Updated migration scripts to accommodate new database structures for rule executions and folder states.
- Enhanced error handling and logging throughout the sync process for better observability.
This commit is contained in:
R3D347HR4Y 2026-05-22 17:38:39 +02:00
parent 4eadb91a64
commit bb5be669c1
13 changed files with 1395 additions and 99 deletions

View File

@ -34,8 +34,10 @@ import (
"github.com/ultisuite/ulti-backend/internal/envexpand"
mailcredentials "github.com/ultisuite/ulti-backend/internal/mail/credentials"
imapsync "github.com/ultisuite/ulti-backend/internal/mail/imap"
mailstorage "github.com/ultisuite/ulti-backend/internal/mail/storage"
"github.com/ultisuite/ulti-backend/internal/mail/rules"
"github.com/ultisuite/ulti-backend/internal/mail/smtp"
mailstorage "github.com/ultisuite/ulti-backend/internal/mail/storage"
"github.com/ultisuite/ulti-backend/internal/mail/webhooks"
"github.com/ultisuite/ulti-backend/internal/meet"
"github.com/ultisuite/ulti-backend/internal/nextcloud"
"github.com/ultisuite/ulti-backend/internal/observability"
@ -136,8 +138,15 @@ func main() {
hub := realtime.NewHub()
healthChecker := observability.NewHealthChecker(cfg, pool, rdb)
rulesEngine := rules.NewEngineWithWebhooks(pool, webhooks.NewExecutor(pool))
// Start background workers
go imapsync.NewSyncWorker(pool, cfg.MailSyncInterval, credentialManager).Start(ctx)
go imapsync.NewSyncWorker(pool, cfg.MailSyncInterval, credentialManager, imapsync.SyncDeps{
Storage: attachmentStorage,
AttachBucket: cfg.MailAttachmentsBucket,
Rules: rulesEngine,
Hub: hub,
}).Start(ctx)
sender := smtp.NewSender(pool, credentialManager)
smtpCircuit := smtp.NewCircuitBreaker(cfg.MailSMTPCircuitFailures, cfg.MailSMTPCircuitCooldown)

View File

@ -0,0 +1,159 @@
package imap
import (
"bytes"
"encoding/base64"
"io"
"mime"
"mime/multipart"
"mime/quotedprintable"
"net/mail"
"strings"
"github.com/ultisuite/ulti-backend/internal/mail/limits"
)
// AttachmentPart is a decoded MIME body part stored as an attachment or inline resource.
type AttachmentPart struct {
Filename string
ContentType string
ContentID string
IsInline bool
Data []byte
}
// ExtractAttachments parses raw RFC 822 message bytes and returns attachment parts.
// Body text/plain and text/html parts are skipped. Non-text parts are collected when
// Content-Disposition is attachment, or inline with a filename. Parts exceeding
// limits.MaxAttachmentBytes are skipped; collection stops at limits.MaxAttachmentsPerMessage.
func ExtractAttachments(raw []byte) ([]AttachmentPart, error) {
if len(raw) == 0 {
return nil, nil
}
msg, err := mail.ReadMessage(bytes.NewReader(raw))
if err != nil {
return nil, err
}
contentType := msg.Header.Get("Content-Type")
if contentType == "" {
return nil, nil
}
mediaType, params, err := mime.ParseMediaType(contentType)
if err != nil || !strings.HasPrefix(mediaType, "multipart/") {
return nil, nil
}
return extractAttachmentsFromMultipart(msg.Body, params["boundary"]), nil
}
func extractAttachmentsFromMultipart(r io.Reader, boundary string) []AttachmentPart {
var attachments []AttachmentPart
mr := multipart.NewReader(r, boundary)
for {
part, err := mr.NextPart()
if err != nil {
break
}
if len(attachments) >= limits.MaxAttachmentsPerMessage {
break
}
partType := part.Header.Get("Content-Type")
mediaType, params, _ := mime.ParseMediaType(partType)
if mediaType == "" {
mediaType = "application/octet-stream"
}
switch {
case mediaType == "text/plain", mediaType == "text/html":
continue
case strings.HasPrefix(mediaType, "multipart/"):
nested := extractAttachmentsFromMultipart(part, params["boundary"])
for _, att := range nested {
if len(attachments) >= limits.MaxAttachmentsPerMessage {
break
}
attachments = append(attachments, att)
}
default:
if att, ok := partToAttachment(part, mediaType, params); ok {
attachments = append(attachments, att)
}
}
}
return attachments
}
func partToAttachment(part *multipart.Part, mediaType string, typeParams map[string]string) (AttachmentPart, bool) {
if strings.HasPrefix(mediaType, "text/") {
return AttachmentPart{}, false
}
disposition, dispParams, _ := mime.ParseMediaType(part.Header.Get("Content-Disposition"))
filename := dispParams["filename"]
if filename == "" {
filename = typeParams["name"]
}
isInline := strings.EqualFold(disposition, "inline")
isAttachment := strings.EqualFold(disposition, "attachment")
if !isAttachment && !(isInline && filename != "") {
return AttachmentPart{}, false
}
data, err := io.ReadAll(part)
if err != nil {
return AttachmentPart{}, false
}
data, err = decodePartBody(part.Header.Get("Content-Transfer-Encoding"), data)
if err != nil || len(data) > limits.MaxAttachmentBytes {
return AttachmentPart{}, false
}
return AttachmentPart{
Filename: filename,
ContentType: mediaType,
ContentID: normalizeContentID(part.Header.Get("Content-ID")),
IsInline: isInline,
Data: data,
}, true
}
func normalizeContentID(raw string) string {
return strings.Trim(raw, "<> \t")
}
func decodePartBody(transferEncoding string, data []byte) ([]byte, error) {
switch strings.ToLower(strings.TrimSpace(transferEncoding)) {
case "base64":
return decodeBase64Body(data)
case "quoted-printable":
return io.ReadAll(quotedprintable.NewReader(bytes.NewReader(data)))
default:
return data, nil
}
}
func decodeBase64Body(data []byte) ([]byte, error) {
clean := bytes.Map(func(r rune) rune {
switch r {
case '\r', '\n', ' ', '\t':
return -1
default:
return r
}
}, data)
decoded := make([]byte, base64.StdEncoding.DecodedLen(len(clean)))
n, err := base64.StdEncoding.Decode(decoded, clean)
if err != nil {
return nil, err
}
return decoded[:n], nil
}

View File

@ -0,0 +1,157 @@
package imap
import (
"encoding/base64"
"strings"
"testing"
)
func TestExtractAttachments_plainAttachment(t *testing.T) {
pdfData := []byte("%PDF-1.4\n")
raw := buildMultipartMessage(t, "mixed", []mimePart{
{
contentType: "text/plain",
body: []byte("Hello world"),
},
{
contentType: "application/pdf; name=\"doc.pdf\"",
disposition: "attachment; filename=\"doc.pdf\"",
body: pdfData,
transferEnc: "base64",
},
})
attachments, err := ExtractAttachments(raw)
if err != nil {
t.Fatalf("ExtractAttachments() error = %v", err)
}
if len(attachments) != 1 {
t.Fatalf("len(attachments) = %d, want 1", len(attachments))
}
att := attachments[0]
if att.Filename != "doc.pdf" {
t.Fatalf("Filename = %q, want doc.pdf", att.Filename)
}
if att.ContentType != "application/pdf" {
t.Fatalf("ContentType = %q, want application/pdf", att.ContentType)
}
if att.IsInline {
t.Fatal("IsInline = true, want false")
}
if att.ContentID != "" {
t.Fatalf("ContentID = %q, want empty", att.ContentID)
}
if string(att.Data) != string(pdfData) {
t.Fatalf("Data = %q, want %q", att.Data, pdfData)
}
}
func TestExtractAttachments_inlineWithCID(t *testing.T) {
pngData := []byte{0x89, 'P', 'N', 'G', '\r', '\n', 0x1a, '\n'}
raw := buildMultipartMessage(t, "related", []mimePart{
{
contentType: "text/html",
body: []byte(`<html><body><img src="cid:logo@cid"></body></html>`),
},
{
contentType: "image/png; name=\"logo.png\"",
disposition: "inline; filename=\"logo.png\"",
contentID: "<logo@cid>",
body: pngData,
transferEnc: "base64",
},
})
attachments, err := ExtractAttachments(raw)
if err != nil {
t.Fatalf("ExtractAttachments() error = %v", err)
}
if len(attachments) != 1 {
t.Fatalf("len(attachments) = %d, want 1", len(attachments))
}
att := attachments[0]
if att.Filename != "logo.png" {
t.Fatalf("Filename = %q, want logo.png", att.Filename)
}
if att.ContentType != "image/png" {
t.Fatalf("ContentType = %q, want image/png", att.ContentType)
}
if !att.IsInline {
t.Fatal("IsInline = false, want true")
}
if att.ContentID != "logo@cid" {
t.Fatalf("ContentID = %q, want logo@cid", att.ContentID)
}
if string(att.Data) != string(pngData) {
t.Fatalf("Data = %q, want %q", att.Data, pngData)
}
}
func TestExtractAttachments_skipsBodyParts(t *testing.T) {
raw := buildMultipartMessage(t, "alternative", []mimePart{
{
contentType: "text/plain",
body: []byte("plain body"),
},
{
contentType: "text/html",
body: []byte("<p>html body</p>"),
},
})
attachments, err := ExtractAttachments(raw)
if err != nil {
t.Fatalf("ExtractAttachments() error = %v", err)
}
if len(attachments) != 0 {
t.Fatalf("len(attachments) = %d, want 0", len(attachments))
}
}
type mimePart struct {
contentType string
disposition string
contentID string
transferEnc string
body []byte
}
func buildMultipartMessage(t *testing.T, subtype string, parts []mimePart) []byte {
t.Helper()
const boundary = "test-boundary"
var b strings.Builder
b.WriteString("From: sender@example.com\r\n")
b.WriteString("To: recipient@example.com\r\n")
b.WriteString("Subject: attachment test\r\n")
b.WriteString("MIME-Version: 1.0\r\n")
b.WriteString("Content-Type: multipart/" + subtype + "; boundary=\"" + boundary + "\"\r\n")
b.WriteString("\r\n")
for _, part := range parts {
b.WriteString("--" + boundary + "\r\n")
b.WriteString("Content-Type: " + part.contentType + "\r\n")
if part.disposition != "" {
b.WriteString("Content-Disposition: " + part.disposition + "\r\n")
}
if part.contentID != "" {
b.WriteString("Content-ID: " + part.contentID + "\r\n")
}
if part.transferEnc != "" {
b.WriteString("Content-Transfer-Encoding: " + part.transferEnc + "\r\n")
}
b.WriteString("\r\n")
if part.transferEnc == "base64" {
b.WriteString(base64.StdEncoding.EncodeToString(part.body))
} else {
b.Write(part.body)
}
b.WriteString("\r\n")
}
b.WriteString("--" + boundary + "--\r\n")
return []byte(b.String())
}

View File

@ -0,0 +1,110 @@
package imap
import (
"strings"
"github.com/emersion/go-imap/v2"
)
var folderDisplayNames = map[string]string{
"inbox": "Inbox",
"sent": "Sent",
"drafts": "Drafts",
"trash": "Trash",
"archive": "Archive",
"spam": "Spam",
}
var folderNameHeuristics = map[string]string{
"inbox": "inbox",
"sent": "sent",
"sent items": "sent",
"sent messages": "sent",
"[gmail]/sent mail": "sent",
"inbox.sent": "sent",
"drafts": "drafts",
"[gmail]/drafts": "drafts",
"trash": "trash",
"deleted": "trash",
"[gmail]/trash": "trash",
"archive": "archive",
"all mail": "archive",
"[gmail]/all mail": "archive",
"junk": "spam",
"spam": "spam",
"[gmail]/spam": "spam",
}
// DetectFolderType classifies an IMAP mailbox for sync using SPECIAL-USE
// attributes when present, otherwise name heuristics. NoSelect mailboxes
// return "" so callers can skip sync.
func DetectFolderType(mailbox string, attrs []imap.MailboxAttr) string {
for _, attr := range attrs {
if attr == imap.MailboxAttrNoSelect {
return ""
}
}
if folderType := folderTypeFromAttrs(attrs); folderType != "" {
return folderType
}
return folderTypeFromName(mailbox)
}
// DisplayName returns a user-facing folder label from the detected type,
// or the last path segment of mailbox when type is custom or unknown.
func DisplayName(mailbox, folderType string) string {
folderType = strings.ToLower(strings.TrimSpace(folderType))
if name, ok := folderDisplayNames[folderType]; ok {
return name
}
return mailboxLeaf(mailbox)
}
func folderTypeFromAttrs(attrs []imap.MailboxAttr) string {
for _, attr := range attrs {
switch attr {
case imap.MailboxAttrSent:
return "sent"
case imap.MailboxAttrDrafts:
return "drafts"
case imap.MailboxAttrTrash:
return "trash"
case imap.MailboxAttrArchive:
return "archive"
case imap.MailboxAttrJunk:
return "spam"
}
}
return ""
}
func folderTypeFromName(mailbox string) string {
full := strings.ToLower(strings.TrimSpace(mailbox))
if folderType, ok := folderNameHeuristics[full]; ok {
return folderType
}
leaf := strings.ToLower(mailboxLeaf(mailbox))
if folderType, ok := folderNameHeuristics[leaf]; ok {
return folderType
}
return "custom"
}
func mailboxLeaf(mailbox string) string {
mailbox = strings.TrimSpace(mailbox)
if mailbox == "" {
return ""
}
leaf := mailbox
for _, sep := range []string{"/", "."} {
if idx := strings.LastIndex(leaf, sep); idx >= 0 && idx < len(leaf)-len(sep) {
leaf = leaf[idx+len(sep):]
}
}
return leaf
}

View File

@ -0,0 +1,243 @@
package imap
import (
"testing"
"github.com/emersion/go-imap/v2"
)
func TestDetectFolderType(t *testing.T) {
tests := []struct {
name string
mailbox string
attrs []imap.MailboxAttr
want string
}{
{
name: "noselect returns empty",
mailbox: "Notes",
attrs: []imap.MailboxAttr{imap.MailboxAttrNoSelect},
want: "",
},
{
name: "noselect skipped even with sent name",
mailbox: "Sent",
attrs: []imap.MailboxAttr{imap.MailboxAttrNoSelect, imap.MailboxAttrSent},
want: "",
},
{
name: "attr sent",
mailbox: "Outbox",
attrs: []imap.MailboxAttr{imap.MailboxAttrSent},
want: "sent",
},
{
name: "attr drafts",
mailbox: "Brouillons",
attrs: []imap.MailboxAttr{imap.MailboxAttrDrafts},
want: "drafts",
},
{
name: "attr trash",
mailbox: "Corbeille",
attrs: []imap.MailboxAttr{imap.MailboxAttrTrash},
want: "trash",
},
{
name: "attr archive",
mailbox: "Old",
attrs: []imap.MailboxAttr{imap.MailboxAttrArchive},
want: "archive",
},
{
name: "attr junk",
mailbox: "Bulk",
attrs: []imap.MailboxAttr{imap.MailboxAttrJunk},
want: "spam",
},
{
name: "attrs beat heuristics",
mailbox: "INBOX",
attrs: []imap.MailboxAttr{imap.MailboxAttrSent},
want: "sent",
},
{
name: "inbox heuristic",
mailbox: "INBOX",
attrs: nil,
want: "inbox",
},
{
name: "inbox lowercase",
mailbox: "inbox",
attrs: nil,
want: "inbox",
},
{
name: "sent heuristic",
mailbox: "Sent",
attrs: nil,
want: "sent",
},
{
name: "sent items heuristic",
mailbox: "Sent Items",
attrs: nil,
want: "sent",
},
{
name: "sent messages heuristic",
mailbox: "Sent Messages",
attrs: nil,
want: "sent",
},
{
name: "gmail sent heuristic",
mailbox: "[Gmail]/Sent Mail",
attrs: nil,
want: "sent",
},
{
name: "inbox dot sent heuristic",
mailbox: "INBOX.Sent",
attrs: nil,
want: "sent",
},
{
name: "drafts heuristic",
mailbox: "Drafts",
attrs: nil,
want: "drafts",
},
{
name: "gmail drafts heuristic",
mailbox: "[Gmail]/Drafts",
attrs: nil,
want: "drafts",
},
{
name: "trash heuristic",
mailbox: "Trash",
attrs: nil,
want: "trash",
},
{
name: "deleted heuristic",
mailbox: "Deleted",
attrs: nil,
want: "trash",
},
{
name: "gmail trash heuristic",
mailbox: "[Gmail]/Trash",
attrs: nil,
want: "trash",
},
{
name: "archive heuristic",
mailbox: "Archive",
attrs: nil,
want: "archive",
},
{
name: "all mail heuristic",
mailbox: "All Mail",
attrs: nil,
want: "archive",
},
{
name: "gmail all mail heuristic",
mailbox: "[Gmail]/All Mail",
attrs: nil,
want: "archive",
},
{
name: "junk heuristic",
mailbox: "Junk",
attrs: nil,
want: "spam",
},
{
name: "spam heuristic",
mailbox: "Spam",
attrs: nil,
want: "spam",
},
{
name: "gmail spam heuristic",
mailbox: "[Gmail]/Spam",
attrs: nil,
want: "spam",
},
{
name: "custom folder",
mailbox: "Work/Projects",
attrs: nil,
want: "custom",
},
{
name: "leaf sent in hierarchy",
mailbox: "Accounts/Sent",
attrs: nil,
want: "sent",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := DetectFolderType(tt.mailbox, tt.attrs)
if got != tt.want {
t.Fatalf("DetectFolderType(%q, %v) = %q, want %q", tt.mailbox, tt.attrs, got, tt.want)
}
})
}
}
func TestDisplayName(t *testing.T) {
tests := []struct {
name string
mailbox string
folderType string
want string
}{
{name: "inbox type", mailbox: "INBOX", folderType: "inbox", want: "Inbox"},
{name: "sent type", mailbox: "[Gmail]/Sent Mail", folderType: "sent", want: "Sent"},
{name: "drafts type", mailbox: "Drafts", folderType: "drafts", want: "Drafts"},
{name: "trash type", mailbox: "Trash", folderType: "trash", want: "Trash"},
{name: "archive type", mailbox: "All Mail", folderType: "archive", want: "Archive"},
{name: "spam type", mailbox: "Junk", folderType: "spam", want: "Spam"},
{
name: "custom uses leaf",
mailbox: "Work/Projects",
folderType: "custom",
want: "Projects",
},
{
name: "unknown type uses leaf",
mailbox: "Clients/ACME",
folderType: "",
want: "ACME",
},
{
name: "dot hierarchy leaf",
mailbox: "INBOX.Work",
folderType: "custom",
want: "Work",
},
{
name: "custom type casing ignored",
mailbox: "Reports/Q1",
folderType: "CUSTOM",
want: "Q1",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := DisplayName(tt.mailbox, tt.folderType)
if got != tt.want {
t.Fatalf("DisplayName(%q, %q) = %q, want %q", tt.mailbox, tt.folderType, got, tt.want)
}
})
}
}

View File

@ -0,0 +1,131 @@
package imap
import (
"context"
"encoding/json"
"log/slog"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/ultisuite/ulti-backend/internal/mail/rules"
"github.com/ultisuite/ulti-backend/internal/realtime"
)
type postSyncEvent struct {
userID string
accountID string
messageID string
kind string // created | updated | deleted
}
// syncPipeline runs rules and realtime notifications after message sync.
type syncPipeline struct {
db *pgxpool.Pool
logger *slog.Logger
rules *rules.Engine
hub *realtime.Hub
}
func newSyncPipeline(db *pgxpool.Pool, rulesEngine *rules.Engine, hub *realtime.Hub) *syncPipeline {
return &syncPipeline{
db: db,
logger: slog.Default().With("component", "imap-pipeline"),
rules: rulesEngine,
hub: hub,
}
}
func (p *syncPipeline) handle(ctx context.Context, ev postSyncEvent) {
if ev.kind == "deleted" {
p.broadcast(ev, realtime.Event{
Type: "mail.deleted",
Payload: map[string]any{
"message_id": ev.messageID,
"account_id": ev.accountID,
},
})
return
}
if p.rules != nil && ev.kind == "created" {
msg, err := p.loadRuleMessage(ctx, ev.messageID)
if err != nil {
p.logger.Error("load message for rules", "message_id", ev.messageID, "error", err)
} else if err := p.rules.EvaluateMessage(ctx, ev.userID, msg); err != nil {
p.logger.Error("rules evaluation failed", "message_id", ev.messageID, "error", err)
}
}
eventType := "mail.updated"
if ev.kind == "created" {
eventType = "mail.created"
}
p.broadcast(ev, realtime.Event{
Type: eventType,
Payload: map[string]any{
"message_id": ev.messageID,
"account_id": ev.accountID,
},
})
}
func (p *syncPipeline) broadcast(ev postSyncEvent, event realtime.Event) {
if p.hub == nil || ev.userID == "" {
return
}
p.hub.Broadcast(ev.userID, event)
}
func (p *syncPipeline) loadRuleMessage(ctx context.Context, messageID string) (*rules.Message, error) {
var (
fromJSON []byte
toJSON []byte
subject string
bodyText string
hasAtt bool
)
err := p.db.QueryRow(ctx, `
SELECT from_addr, to_addrs, subject, body_text, has_attachments
FROM messages WHERE id = $1
`, messageID).Scan(&fromJSON, &toJSON, &subject, &bodyText, &hasAtt)
if err != nil {
return nil, err
}
from := firstAddressString(fromJSON)
to := addressListStrings(toJSON)
return &rules.Message{
ID: messageID,
From: from,
To: to,
Subject: subject,
BodyText: bodyText,
HasAttachments: hasAtt,
}, nil
}
func firstAddressString(fromJSON []byte) string {
var addrs []EmailAddress
if err := json.Unmarshal(fromJSON, &addrs); err != nil || len(addrs) == 0 {
return string(fromJSON)
}
a := addrs[0]
if a.Name != "" {
return a.Name + " <" + a.Address + ">"
}
return a.Address
}
func addressListStrings(toJSON []byte) []string {
var addrs []EmailAddress
if err := json.Unmarshal(toJSON, &addrs); err != nil {
return nil
}
out := make([]string, 0, len(addrs))
for _, a := range addrs {
if a.Address != "" {
out = append(out, a.Address)
}
}
return out
}

View File

@ -1,6 +1,7 @@
package imap
import (
"bytes"
"context"
"errors"
"fmt"
@ -13,23 +14,41 @@ import (
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/ultisuite/ulti-backend/internal/mail/credentials"
"github.com/ultisuite/ulti-backend/internal/mail/limits"
"github.com/ultisuite/ulti-backend/internal/mail/rules"
"github.com/ultisuite/ulti-backend/internal/mail/storage"
"github.com/ultisuite/ulti-backend/internal/mail/threading"
"github.com/ultisuite/ulti-backend/internal/observability"
"github.com/ultisuite/ulti-backend/internal/realtime"
)
type SyncWorker struct {
db *pgxpool.Pool
logger *slog.Logger
interval time.Duration
credentials *credentials.Manager
// SyncDeps optional services wired into the IMAP sync worker.
type SyncDeps struct {
Storage *storage.Client
AttachBucket string
Rules *rules.Engine
Hub *realtime.Hub
}
func NewSyncWorker(db *pgxpool.Pool, interval time.Duration, credManager *credentials.Manager) *SyncWorker {
type SyncWorker struct {
db *pgxpool.Pool
logger *slog.Logger
interval time.Duration
credentials *credentials.Manager
storage *storage.Client
attachBucket string
pipeline *syncPipeline
}
func NewSyncWorker(db *pgxpool.Pool, interval time.Duration, credManager *credentials.Manager, deps SyncDeps) *SyncWorker {
return &SyncWorker{
db: db,
logger: slog.Default().With("component", "imap-sync"),
interval: interval,
credentials: credManager,
db: db,
logger: slog.Default().With("component", "imap-sync"),
interval: interval,
credentials: credManager,
storage: deps.Storage,
attachBucket: deps.AttachBucket,
pipeline: newSyncPipeline(db, deps.Rules, deps.Hub),
}
}
@ -38,8 +57,6 @@ func (w *SyncWorker) Start(ctx context.Context) {
defer ticker.Stop()
w.logger.Info("imap sync worker started", "interval", w.interval)
// Initial sync
w.runSyncCycle(ctx)
for {
@ -65,12 +82,11 @@ func (w *SyncWorker) runSyncCycle(ctx context.Context) {
func (w *SyncWorker) syncAllAccounts(ctx context.Context) error {
rows, err := w.db.Query(ctx, `
SELECT id, imap_host, imap_port, imap_tls, credentials, sync_state
SELECT id, imap_host, imap_port, imap_tls, credentials
FROM mail_accounts
WHERE is_active = true
`)
if err != nil {
w.logger.Error("failed to query accounts", "error", err)
return err
}
defer rows.Close()
@ -83,15 +99,13 @@ func (w *SyncWorker) syncAllAccounts(ctx context.Context) error {
port int
useTLS bool
creds []byte
syncState []byte
)
if err := rows.Scan(&accountID, &host, &port, &useTLS, &creds, &syncState); err != nil {
if err := rows.Scan(&accountID, &host, &port, &useTLS, &creds); err != nil {
w.logger.Error("failed to scan account", "error", err)
hasSyncError = true
continue
}
if err := w.syncAccount(ctx, accountID, host, port, useTLS, creds, syncState); err != nil {
if err := w.syncAccount(ctx, accountID, host, port, useTLS, creds); err != nil {
w.logger.Error("sync failed", "account_id", accountID, "error", err)
hasSyncError = true
}
@ -105,13 +119,15 @@ func (w *SyncWorker) syncAllAccounts(ctx context.Context) error {
return nil
}
func (w *SyncWorker) syncAccount(ctx context.Context, accountID, host string, port int, useTLS bool, creds, syncState []byte) error {
func (w *SyncWorker) syncAccount(ctx context.Context, accountID, host string, port int, useTLS bool, creds []byte) error {
userID, err := w.accountUserID(ctx, accountID)
if err != nil {
return err
}
addr := fmt.Sprintf("%s:%d", host, port)
var client *imapclient.Client
var err error
opts := &imapclient.Options{}
var client *imapclient.Client
if useTLS {
client, err = imapclient.DialTLS(addr, opts)
} else {
@ -126,86 +142,199 @@ func (w *SyncWorker) syncAccount(ctx context.Context, accountID, host string, po
if err != nil {
return fmt.Errorf("decrypt credentials: %w", err)
}
if err := client.Login(username, password).Wait(); err != nil {
return fmt.Errorf("login: %w", err)
}
// List mailboxes
mailboxes, err := client.List("", "*", nil).Collect()
listOpts := &imap.ListOptions{ReturnSpecialUse: true}
mailboxes, err := client.List("", "*", listOpts).Collect()
if err != nil {
return fmt.Errorf("list: %w", err)
}
for _, mbox := range mailboxes {
if err := w.syncFolder(ctx, client, accountID, mbox.Mailbox); err != nil {
folderType := DetectFolderType(mbox.Mailbox, mbox.Attrs)
if folderType == "" {
continue
}
if err := w.syncFolder(ctx, client, accountID, userID, mbox.Mailbox, folderType); err != nil {
w.logger.Error("folder sync failed", "account_id", accountID, "folder", mbox.Mailbox, "error", err)
}
}
// Update last sync time
_, err = w.db.Exec(ctx, `UPDATE mail_accounts SET last_sync_at = NOW() WHERE id = $1`, accountID)
return err
}
func (w *SyncWorker) syncFolder(ctx context.Context, client *imapclient.Client, accountID, folderName string) error {
selectData, err := client.Select(folderName, nil).Wait()
func (w *SyncWorker) accountUserID(ctx context.Context, accountID string) (string, error) {
var userID string
err := w.db.QueryRow(ctx, `SELECT user_id FROM mail_accounts WHERE id = $1`, accountID).Scan(&userID)
return userID, err
}
func (w *SyncWorker) syncFolder(ctx context.Context, client *imapclient.Client, accountID, userID, folderName, folderType string) error {
selectData, err := client.Select(folderName, &imap.SelectOptions{CondStore: true}).Wait()
if err != nil {
return fmt.Errorf("select %s: %w", folderName, err)
}
// Upsert folder record
displayName := DisplayName(folderName, folderType)
prevState, hasPrev, err := loadFolderSyncState(ctx, w.db, accountID, folderName)
if err != nil && !errors.Is(err, pgx.ErrNoRows) {
return fmt.Errorf("load folder state: %w", err)
}
var folderID string
if hasPrev && prevState.UIDValidity != 0 && prevState.UIDValidity != selectData.UIDValidity {
if err := resetFolderMessages(ctx, w.db, prevState.FolderID); err != nil {
return fmt.Errorf("reset folder after uidvalidity change: %w", err)
}
prevState.LastUID = 0
prevState.HighestModSeq = 0
}
err = w.db.QueryRow(ctx, `
INSERT INTO mail_folders (account_id, name, remote_name, uidvalidity, message_count)
VALUES ($1, $2, $2, $3, $4)
INSERT INTO mail_folders (account_id, name, remote_name, folder_type, uidvalidity, highest_modseq, message_count, last_uid)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT (account_id, remote_name) DO UPDATE
SET uidvalidity = EXCLUDED.uidvalidity,
SET name = EXCLUDED.name,
folder_type = EXCLUDED.folder_type,
uidvalidity = EXCLUDED.uidvalidity,
highest_modseq = GREATEST(mail_folders.highest_modseq, EXCLUDED.highest_modseq),
message_count = EXCLUDED.message_count,
updated_at = NOW()
RETURNING id
`, accountID, folderName, selectData.UIDValidity, selectData.NumMessages).Scan(&folderID)
RETURNING id, last_uid, highest_modseq
`, accountID, displayName, folderName, folderType,
selectData.UIDValidity, selectData.HighestModSeq, selectData.NumMessages, prevState.LastUID,
).Scan(&folderID, &prevState.LastUID, &prevState.HighestModSeq)
if err != nil {
return fmt.Errorf("upsert folder: %w", err)
}
prevState.FolderID = folderID
prevState.UIDValidity = selectData.UIDValidity
if selectData.NumMessages == 0 {
return nil
return saveFolderSyncState(ctx, w.db, folderID, selectData.UIDValidity, selectData.HighestModSeq, 0, 0)
}
// Get highest UID we already have for this folder
var lastUID uint32
_ = w.db.QueryRow(ctx, `
SELECT COALESCE(MAX(uid), 0) FROM messages WHERE folder_id = $1
`, folderID).Scan(&lastUID)
lastUID := prevState.LastUID
if lastUID > 0 {
if err := w.fetchAndProcess(ctx, client, accountID, userID, folderID, lastUID+1, 0, false); err != nil {
return err
}
} else {
if err := w.fetchAndProcess(ctx, client, accountID, userID, folderID, 1, 0, false); err != nil {
return err
}
}
// Fetch messages newer than our last UID
if selectData.HighestModSeq > 0 && prevState.HighestModSeq > 0 && selectData.HighestModSeq > prevState.HighestModSeq {
if err := w.fetchAndProcess(ctx, client, accountID, userID, folderID, 1, prevState.HighestModSeq, true); err != nil {
w.logger.Warn("condstore incremental fetch failed", "folder", folderName, "error", err)
}
}
if err := w.reconcileDeletions(ctx, client, folderID, userID, accountID); err != nil {
w.logger.Warn("deletion reconcile failed", "folder", folderName, "error", err)
}
var maxUID uint32
_ = w.db.QueryRow(ctx, `SELECT COALESCE(MAX(uid), 0) FROM messages WHERE folder_id = $1`, folderID).Scan(&maxUID)
return saveFolderSyncState(ctx, w.db, folderID, selectData.UIDValidity, selectData.HighestModSeq, maxUID, int(selectData.NumMessages))
}
func (w *SyncWorker) fetchAndProcess(ctx context.Context, client *imapclient.Client, accountID, userID, folderID string, fromUID uint32, changedSince uint64, updatesOnly bool) error {
seqSet := imap.UIDSet{}
seqSet.AddRange(imap.UID(lastUID+1), imap.UID(0)) // lastUID+1 to *
seqSet.AddRange(imap.UID(fromUID), imap.UID(0))
fetchOpts := &imap.FetchOptions{
UID: true,
Flags: true,
Envelope: true,
ModSeq: changedSince > 0,
BodySection: []*imap.FetchItemBodySection{{}},
}
if changedSince > 0 {
fetchOpts.ChangedSince = changedSince
}
fetchCmd := client.Fetch(seqSet, fetchOpts)
for {
msg := fetchCmd.Next()
if msg == nil {
break
}
if err := w.processMessage(ctx, msg, accountID, folderID); err != nil {
w.logger.Error("process message failed", "folder", folderName, "error", err)
kind, messageID, err := w.processMessage(ctx, msg, accountID, userID, folderID, updatesOnly)
if err != nil {
w.logger.Error("process message failed", "folder_id", folderID, "error", err)
continue
}
if kind != "" && w.pipeline != nil {
w.pipeline.handle(ctx, postSyncEvent{
userID: userID, accountID: accountID, messageID: messageID, kind: kind,
})
}
}
return fetchCmd.Close()
}
func (w *SyncWorker) processMessage(ctx context.Context, msg *imapclient.FetchMessageData, accountID, folderID string) error {
func (w *SyncWorker) reconcileDeletions(ctx context.Context, client *imapclient.Client, folderID, userID, accountID string) error {
searchData, err := client.UIDSearch(&imap.SearchCriteria{}, &imap.SearchOptions{ReturnAll: true}).Wait()
if err != nil {
return err
}
remoteUIDs := uidSetToMap(searchData.All)
if len(remoteUIDs) == 0 && searchData.Count == 0 {
return nil
}
rows, err := w.db.Query(ctx, `SELECT id, uid FROM messages WHERE folder_id = $1`, folderID)
if err != nil {
return err
}
defer rows.Close()
for rows.Next() {
var messageID string
var uid uint32
if err := rows.Scan(&messageID, &uid); err != nil {
return err
}
if !remoteUIDs[uid] {
if _, err := w.db.Exec(ctx, `DELETE FROM messages WHERE id = $1`, messageID); err != nil {
return err
}
if w.pipeline != nil {
w.pipeline.handle(ctx, postSyncEvent{
userID: userID, accountID: accountID, messageID: messageID, kind: "deleted",
})
}
}
}
return rows.Err()
}
func uidSetToMap(set imap.NumSet) map[uint32]bool {
out := make(map[uint32]bool)
if set == nil {
return out
}
uidSet, ok := set.(imap.UIDSet)
if !ok {
return out
}
uids, ok := uidSet.Nums()
if !ok {
return out
}
for _, uid := range uids {
out[uint32(uid)] = true
}
return out
}
func (w *SyncWorker) processMessage(ctx context.Context, msg *imapclient.FetchMessageData, accountID, userID, folderID string, updatesOnly bool) (kind, messageID string, err error) {
var envelope *imap.Envelope
var uid imap.UID
var flags []imap.Flag
@ -240,19 +369,14 @@ func (w *SyncWorker) processMessage(ctx context.Context, msg *imapclient.FetchMe
}
}
if envelope == nil {
return nil
}
flagStrs := make([]string, len(flags))
for i, f := range flags {
flagStrs[i] = string(f)
if envelope == nil || uid == 0 {
return "", "", nil
}
flagStrs := flagsToStrings(flags)
fromAddr := addressesToJSON(envelope.From)
toAddrs := addressesToJSON(envelope.To)
ccAddrs := addressesToJSON(envelope.Cc)
bodyText, bodyHTML := parseBody(bodyContent)
snippet := truncate(bodyText, 200)
@ -266,29 +390,110 @@ func (w *SyncWorker) processMessage(ctx context.Context, msg *imapclient.FetchMe
references = threading.ParseMessageIDs(strings.Join(envelope.InReplyTo, " "))
}
var rowID string
err := w.db.QueryRow(ctx, `
var existed bool
_ = w.db.QueryRow(ctx, `
SELECT EXISTS(SELECT 1 FROM messages WHERE folder_id = $1 AND uid = $2)
`, folderID, uid).Scan(&existed)
err = w.db.QueryRow(ctx, `
INSERT INTO messages (account_id, folder_id, uid, message_id, subject, from_addr, to_addrs, cc_addrs, date, snippet, body_text, body_html, flags, in_reply_to, references_header)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15)
ON CONFLICT (folder_id, uid) DO NOTHING
ON CONFLICT (folder_id, uid) DO UPDATE SET
message_id = EXCLUDED.message_id,
subject = EXCLUDED.subject,
from_addr = EXCLUDED.from_addr,
to_addrs = EXCLUDED.to_addrs,
cc_addrs = EXCLUDED.cc_addrs,
date = EXCLUDED.date,
snippet = EXCLUDED.snippet,
body_text = EXCLUDED.body_text,
body_html = EXCLUDED.body_html,
flags = EXCLUDED.flags,
in_reply_to = EXCLUDED.in_reply_to,
references_header = EXCLUDED.references_header,
updated_at = NOW()
RETURNING id
`, accountID, folderID, uid, envelope.MessageID, envelope.Subject,
fromAddr, toAddrs, ccAddrs, envelope.Date, snippet, bodyText, bodyHTML, flagStrs, inReplyTo, references).Scan(&rowID)
fromAddr, toAddrs, ccAddrs, envelope.Date, snippet, bodyText, bodyHTML, flagStrs, inReplyTo, references,
).Scan(&messageID)
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return nil
}
return err
return "", "", err
}
threadID, err := threading.AssignThreadID(ctx, w.db, accountID, inReplyTo, references)
if err != nil {
return "", "", err
}
_, err = w.db.Exec(ctx, `UPDATE messages SET thread_id = $1, updated_at = NOW() WHERE id = $2`, threadID, messageID)
if err != nil {
return "", "", err
}
if err := w.storeAttachments(ctx, userID, messageID, bodyContent, existed); err != nil {
w.logger.Warn("attachment store failed", "message_id", messageID, "error", err)
}
if existed {
return "updated", messageID, nil
}
if updatesOnly {
return "updated", messageID, nil
}
return "created", messageID, nil
}
func (w *SyncWorker) storeAttachments(ctx context.Context, userID, messageID string, raw []byte, messageExisted bool) error {
if w.storage == nil || len(raw) == 0 {
return nil
}
if messageExisted {
var attCount int
_ = w.db.QueryRow(ctx, `SELECT COUNT(*) FROM attachments WHERE message_id = $1`, messageID).Scan(&attCount)
if attCount > 0 {
return nil
}
}
parts, err := ExtractAttachments(raw)
if err != nil || len(parts) == 0 {
return err
}
_, err = w.db.Exec(ctx, `UPDATE messages SET thread_id = $1, updated_at = NOW() WHERE id = $2`, threadID, rowID)
bucket := w.attachBucket
if bucket == "" {
bucket = "mail-attachments"
}
for _, part := range parts {
if err := limits.ValidateAttachmentSize(int64(len(part.Data))); err != nil {
continue
}
objectKey := storage.MessageObjectKey(userID, messageID, part.Filename)
if err := w.storage.Put(ctx, objectKey, bytes.NewReader(part.Data), int64(len(part.Data)), part.ContentType); err != nil {
return err
}
_, err := w.db.Exec(ctx, `
INSERT INTO attachments (message_id, filename, content_type, size, s3_bucket, s3_key, content_id, is_inline)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
`, messageID, part.Filename, part.ContentType, len(part.Data), bucket, objectKey, part.ContentID, part.IsInline)
if err != nil {
_ = w.storage.Delete(ctx, objectKey)
return err
}
}
_, err = w.db.Exec(ctx, `UPDATE messages SET has_attachments = true, updated_at = NOW() WHERE id = $1`, messageID)
return err
}
func flagsToStrings(flags []imap.Flag) []string {
out := make([]string, len(flags))
for i, f := range flags {
out[i] = string(f)
}
return out
}
func (w *SyncWorker) parseCredentials(creds []byte) (string, string, error) {
if len(creds) == 0 {
return "", "", errors.New("missing credentials")
@ -302,19 +507,6 @@ func (w *SyncWorker) parseCredentials(creds []byte) (string, string, error) {
return w.credentials.Decrypt(creds)
}
func splitBytes(data []byte, sep byte) [][]byte {
var parts [][]byte
start := 0
for i, b := range data {
if b == sep {
parts = append(parts, data[start:i])
start = i + 1
}
}
parts = append(parts, data[start:])
return parts
}
func truncate(s string, maxLen int) string {
if len(s) <= maxLen {
return s

View File

@ -0,0 +1,45 @@
package imap
import (
"context"
"github.com/jackc/pgx/v5/pgxpool"
)
type folderSyncState struct {
FolderID string
UIDValidity uint32
HighestModSeq uint64
LastUID uint32
}
func loadFolderSyncState(ctx context.Context, db *pgxpool.Pool, accountID, remoteName string) (folderSyncState, bool, error) {
var state folderSyncState
err := db.QueryRow(ctx, `
SELECT id, uidvalidity, highest_modseq, last_uid
FROM mail_folders
WHERE account_id = $1 AND remote_name = $2
`, accountID, remoteName).Scan(&state.FolderID, &state.UIDValidity, &state.HighestModSeq, &state.LastUID)
if err != nil {
return folderSyncState{}, false, err
}
return state, true, nil
}
func saveFolderSyncState(ctx context.Context, db *pgxpool.Pool, folderID string, uidValidity uint32, highestModSeq uint64, lastUID uint32, messageCount int) error {
_, err := db.Exec(ctx, `
UPDATE mail_folders
SET uidvalidity = $2,
highest_modseq = $3,
last_uid = $4,
message_count = $5,
updated_at = NOW()
WHERE id = $1
`, folderID, uidValidity, highestModSeq, lastUID, messageCount)
return err
}
func resetFolderMessages(ctx context.Context, db *pgxpool.Pool, folderID string) error {
_, err := db.Exec(ctx, `DELETE FROM messages WHERE folder_id = $1`, folderID)
return err
}

View File

@ -8,11 +8,17 @@ import (
"strings"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/ultisuite/ulti-backend/internal/mail/webhooks"
)
type WebhookExecutor interface {
Execute(ctx context.Context, templateID string, msgCtx *webhooks.MessageContext) error
}
type Engine struct {
db *pgxpool.Pool
logger *slog.Logger
db *pgxpool.Pool
logger *slog.Logger
webhookExec WebhookExecutor
}
func NewEngine(db *pgxpool.Pool) *Engine {
@ -22,6 +28,16 @@ func NewEngine(db *pgxpool.Pool) *Engine {
}
}
func NewEngineWithWebhooks(db *pgxpool.Pool, webhookExec WebhookExecutor) *Engine {
e := NewEngine(db)
e.webhookExec = webhookExec
return e
}
func (e *Engine) SetWebhookExecutor(webhookExec WebhookExecutor) {
e.webhookExec = webhookExec
}
type Rule struct {
ID string `json:"id"`
Name string `json:"name"`
@ -41,6 +57,13 @@ type Action struct {
Value string `json:"value"` // label name, folder name, email, webhook_id
}
type ActionResult struct {
Type string `json:"type"`
Value string `json:"value"`
OK bool `json:"ok"`
Error string `json:"error"`
}
type Message struct {
ID string `json:"id"`
From string `json:"from"`
@ -51,6 +74,10 @@ type Message struct {
}
func (e *Engine) Evaluate(ctx context.Context, userID string, msg *Message) error {
return e.EvaluateMessage(ctx, userID, msg)
}
func (e *Engine) EvaluateMessage(ctx context.Context, userID string, msg *Message) error {
rows, err := e.db.Query(ctx, `
SELECT id, name, conditions, actions
FROM mail_rules
@ -81,12 +108,10 @@ func (e *Engine) Evaluate(ctx context.Context, userID string, msg *Message) erro
if matchesAll(conditions, msg) {
e.logger.Info("rule matched", "rule_id", ruleID, "rule_name", name, "message_id", msg.ID)
for _, action := range actions {
if err := e.executeAction(ctx, action, msg); err != nil {
e.logger.Error("action failed", "rule_id", ruleID, "action", action.Type, "error", err)
}
results := e.executeRuleActions(ctx, ruleID, actions, msg)
if err := e.recordRuleExecution(ctx, ruleID, msg.ID, results); err != nil {
e.logger.Error("record rule execution", "rule_id", ruleID, "message_id", msg.ID, "error", err)
}
// Increment match count
e.db.Exec(ctx, `UPDATE mail_rules SET match_count = match_count + 1 WHERE id = $1`, ruleID)
}
}
@ -94,6 +119,58 @@ func (e *Engine) Evaluate(ctx context.Context, userID string, msg *Message) erro
return nil
}
func (e *Engine) executeRuleActions(ctx context.Context, ruleID string, actions []Action, msg *Message) []ActionResult {
results := make([]ActionResult, 0, len(actions))
for _, action := range actions {
err := e.executeAction(ctx, action, msg)
results = append(results, actionResultFrom(action, err))
if err != nil {
e.logger.Error("action failed", "rule_id", ruleID, "action", action.Type, "error", err)
}
}
return results
}
func actionResultFrom(action Action, err error) ActionResult {
result := ActionResult{
Type: action.Type,
Value: action.Value,
OK: err == nil,
}
if err != nil {
result.Error = err.Error()
}
return result
}
func (e *Engine) recordRuleExecution(ctx context.Context, ruleID, messageID string, results []ActionResult) error {
actionsJSON, err := json.Marshal(results)
if err != nil {
return fmt.Errorf("marshal actions_applied: %w", err)
}
execError := aggregateActionErrors(results)
_, err = e.db.Exec(ctx, `
INSERT INTO rule_executions (rule_id, message_id, actions_applied, error)
VALUES ($1, $2, $3, $4)
`, ruleID, messageID, actionsJSON, execError)
if err != nil {
return fmt.Errorf("insert rule_executions: %w", err)
}
return nil
}
func aggregateActionErrors(results []ActionResult) string {
var parts []string
for _, r := range results {
if !r.OK && r.Error != "" {
parts = append(parts, fmt.Sprintf("%s: %s", r.Type, r.Error))
}
}
return strings.Join(parts, "; ")
}
func matchesAll(conditions []Condition, msg *Message) bool {
for _, cond := range conditions {
if !matchCondition(cond, msg) {
@ -143,6 +220,36 @@ func matchCondition(cond Condition, msg *Message) bool {
}
}
func messageToWebhookContext(msg *Message) *webhooks.MessageContext {
senderName, senderEmail := parseFromAddress(msg.From)
return &webhooks.MessageContext{
SenderName: senderName,
SenderEmail: senderEmail,
Subject: msg.Subject,
BodyText: msg.BodyText,
Recipients: strings.Join(msg.To, ", "),
HasAttachment: msg.HasAttachments,
MessageID: msg.ID,
}
}
func parseFromAddress(from string) (name, email string) {
from = strings.TrimSpace(from)
if from == "" {
return "", ""
}
if i := strings.LastIndex(from, "<"); i >= 0 {
j := strings.LastIndex(from, ">")
if j > i {
email = strings.TrimSpace(from[i+1 : j])
name = strings.TrimSpace(from[:i])
name = strings.Trim(name, `"`)
return name, email
}
}
return "", from
}
func (e *Engine) executeAction(ctx context.Context, action Action, msg *Message) error {
switch action.Type {
case "label":
@ -180,8 +287,10 @@ func (e *Engine) executeAction(ctx context.Context, action Action, msg *Message)
`, msg.ID)
return err
case "webhook":
// Webhook execution is handled by the webhooks package
return nil
if e.webhookExec == nil {
return fmt.Errorf("webhook executor not configured")
}
return e.webhookExec.Execute(ctx, action.Value, messageToWebhookContext(msg))
default:
return fmt.Errorf("unknown action type: %s", action.Type)
}

View File

@ -2,8 +2,11 @@ package rules
import (
"context"
"encoding/json"
"strings"
"testing"
"github.com/ultisuite/ulti-backend/internal/mail/webhooks"
)
func testMessage() *Message {
@ -86,3 +89,124 @@ func TestExecuteAction_unknownType(t *testing.T) {
t.Fatalf("executeAction() error = %v, want unknown action type", err)
}
}
func TestActionResultJSON(t *testing.T) {
results := []ActionResult{
{Type: "label", Value: "important", OK: true},
{Type: "webhook", Value: "tpl-1", OK: false, Error: "request failed: timeout"},
}
data, err := json.Marshal(results)
if err != nil {
t.Fatalf("json.Marshal() error = %v", err)
}
var decoded []ActionResult
if err := json.Unmarshal(data, &decoded); err != nil {
t.Fatalf("json.Unmarshal() error = %v", err)
}
if len(decoded) != 2 {
t.Fatalf("len(decoded) = %d, want 2", len(decoded))
}
if decoded[0].Type != "label" || !decoded[0].OK || decoded[0].Error != "" {
t.Fatalf("decoded[0] = %+v, want label ok=true empty error", decoded[0])
}
if decoded[1].Type != "webhook" || decoded[1].OK || decoded[1].Error == "" {
t.Fatalf("decoded[1] = %+v, want webhook ok=false with error", decoded[1])
}
}
func TestActionResultFrom(t *testing.T) {
ok := actionResultFrom(Action{Type: "archive", Value: ""}, nil)
if !ok.OK || ok.Error != "" {
t.Fatalf("actionResultFrom(nil err) = %+v, want ok=true", ok)
}
fail := actionResultFrom(Action{Type: "webhook", Value: "tpl-1"}, context.DeadlineExceeded)
if fail.OK || fail.Error == "" {
t.Fatalf("actionResultFrom(err) = %+v, want ok=false with error", fail)
}
}
func TestAggregateActionErrors(t *testing.T) {
got := aggregateActionErrors([]ActionResult{
{Type: "label", OK: true},
{Type: "webhook", OK: false, Error: "timeout"},
{Type: "move", OK: false, Error: "folder missing"},
})
want := "webhook: timeout; move: folder missing"
if got != want {
t.Fatalf("aggregateActionErrors() = %q, want %q", got, want)
}
if aggregateActionErrors(nil) != "" {
t.Fatal("aggregateActionErrors(nil) want empty string")
}
}
func TestParseFromAddress(t *testing.T) {
tests := []struct {
from, wantName, wantEmail string
}{
{"Alice <alice@example.com>", "Alice", "alice@example.com"},
{`"Bob Smith" <bob@example.com>`, "Bob Smith", "bob@example.com"},
{"carol@example.com", "", "carol@example.com"},
{"", "", ""},
}
for _, tt := range tests {
name, email := parseFromAddress(tt.from)
if name != tt.wantName || email != tt.wantEmail {
t.Fatalf("parseFromAddress(%q) = (%q, %q), want (%q, %q)", tt.from, name, email, tt.wantName, tt.wantEmail)
}
}
}
func TestMessageToWebhookContext(t *testing.T) {
msg := testMessage()
ctx := messageToWebhookContext(msg)
if ctx.SenderName != "Alice" || ctx.SenderEmail != "alice@example.com" {
t.Fatalf("sender = (%q, %q), want (Alice, alice@example.com)", ctx.SenderName, ctx.SenderEmail)
}
if ctx.Subject != msg.Subject || ctx.BodyText != msg.BodyText {
t.Fatal("subject/body not copied")
}
if ctx.Recipients != "bob@example.com, carol@example.com" {
t.Fatalf("Recipients = %q", ctx.Recipients)
}
if !ctx.HasAttachment || ctx.MessageID != msg.ID {
t.Fatalf("HasAttachment/MessageID = (%v, %q)", ctx.HasAttachment, ctx.MessageID)
}
}
type mockWebhookExecutor struct {
templateID string
msgCtx *webhooks.MessageContext
err error
}
func (m *mockWebhookExecutor) Execute(_ context.Context, templateID string, msgCtx *webhooks.MessageContext) error {
m.templateID = templateID
m.msgCtx = msgCtx
return m.err
}
func TestExecuteAction_webhook(t *testing.T) {
msg := testMessage()
mock := &mockWebhookExecutor{}
e := &Engine{webhookExec: mock}
if err := e.executeAction(context.Background(), Action{Type: "webhook", Value: "tpl-abc"}, msg); err != nil {
t.Fatalf("executeAction() error = %v", err)
}
if mock.templateID != "tpl-abc" {
t.Fatalf("templateID = %q, want tpl-abc", mock.templateID)
}
if mock.msgCtx.MessageID != msg.ID {
t.Fatalf("msgCtx.MessageID = %q, want %q", mock.msgCtx.MessageID, msg.ID)
}
e.webhookExec = nil
err := e.executeAction(context.Background(), Action{Type: "webhook", Value: "tpl-abc"}, msg)
if err == nil || !strings.Contains(err.Error(), "webhook executor not configured") {
t.Fatalf("executeAction() without executor = %v, want not configured error", err)
}
}

View File

@ -0,0 +1,3 @@
DROP TABLE IF EXISTS rule_executions;
ALTER TABLE mail_folders DROP COLUMN IF EXISTS last_uid;

View File

@ -0,0 +1,14 @@
ALTER TABLE mail_folders ADD COLUMN last_uid BIGINT NOT NULL DEFAULT 0;
CREATE TABLE rule_executions (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
rule_id UUID NOT NULL REFERENCES mail_rules(id) ON DELETE CASCADE,
message_id UUID NOT NULL REFERENCES messages(id) ON DELETE CASCADE,
actions_applied JSONB NOT NULL DEFAULT '[]',
error TEXT NOT NULL DEFAULT '',
executed_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_rule_executions_rule_id ON rule_executions(rule_id);
CREATE INDEX idx_rule_executions_message_id ON rule_executions(message_id);
CREATE INDEX idx_rule_executions_executed_at ON rule_executions(executed_at);

View File

@ -93,13 +93,13 @@ Objectif: transformer état actuel (partiellement implémenté) vers produit fon
### 2.2 Sync IMAP & pipeline mail
- [ ] Persister état sync incrémental fiable (UIDVALIDITY, MODSEQ, last seen UID).
- [ ] Support suppressions/updates IMAP, pas seulement insert nouveaux messages.
- [ ] Gérer dossiers spéciaux (Sent/Drafts/Trash/Archive/Spam) cross-provider.
- [ ] Extraire et stocker attachments vers object storage.
- [ ] Lancer rules engine à réception et tracer résultats.
- [ ] Déclencher webhooks selon règles matchées.
- [ ] Publier events realtime WS après sync.
- [x] Persister état sync incrémental fiable (UIDVALIDITY, MODSEQ, last seen UID).
- [x] Support suppressions/updates IMAP, pas seulement insert nouveaux messages.
- [x] Gérer dossiers spéciaux (Sent/Drafts/Trash/Archive/Spam) cross-provider.
- [x] Extraire et stocker attachments vers object storage.
- [x] Lancer rules engine à réception et tracer résultats.
- [x] Déclencher webhooks selon règles matchées.
- [x] Publier events realtime WS après sync.
### 2.3 SMTP / Outbox / Scheduling
@ -111,8 +111,8 @@ Objectif: transformer état actuel (partiellement implémenté) vers produit fon
### 2.4 Rules & Webhooks
- [ ] Câbler réellement `rules.Engine` dans pipeline réception.
- [ ] Câbler réellement `webhooks.Executor` depuis actions de règles.
- [x] Câbler réellement `rules.Engine` dans pipeline réception.
- [x] Câbler réellement `webhooks.Executor` depuis actions de règles.
- [ ] Ajouter simulation/test endpoint "run rule on sample message".
- [ ] Ajouter templates webhook versionnés + preview rendu variables.
- [ ] Ajouter signatures webhook (HMAC) + retry + backoff + DLQ.