diff --git a/cmd/ultid/main.go b/cmd/ultid/main.go index defa104..17dc092 100644 --- a/cmd/ultid/main.go +++ b/cmd/ultid/main.go @@ -34,8 +34,10 @@ import ( "github.com/ultisuite/ulti-backend/internal/envexpand" mailcredentials "github.com/ultisuite/ulti-backend/internal/mail/credentials" imapsync "github.com/ultisuite/ulti-backend/internal/mail/imap" - mailstorage "github.com/ultisuite/ulti-backend/internal/mail/storage" + "github.com/ultisuite/ulti-backend/internal/mail/rules" "github.com/ultisuite/ulti-backend/internal/mail/smtp" + mailstorage "github.com/ultisuite/ulti-backend/internal/mail/storage" + "github.com/ultisuite/ulti-backend/internal/mail/webhooks" "github.com/ultisuite/ulti-backend/internal/meet" "github.com/ultisuite/ulti-backend/internal/nextcloud" "github.com/ultisuite/ulti-backend/internal/observability" @@ -136,8 +138,15 @@ func main() { hub := realtime.NewHub() healthChecker := observability.NewHealthChecker(cfg, pool, rdb) + rulesEngine := rules.NewEngineWithWebhooks(pool, webhooks.NewExecutor(pool)) + // Start background workers - go imapsync.NewSyncWorker(pool, cfg.MailSyncInterval, credentialManager).Start(ctx) + go imapsync.NewSyncWorker(pool, cfg.MailSyncInterval, credentialManager, imapsync.SyncDeps{ + Storage: attachmentStorage, + AttachBucket: cfg.MailAttachmentsBucket, + Rules: rulesEngine, + Hub: hub, + }).Start(ctx) sender := smtp.NewSender(pool, credentialManager) smtpCircuit := smtp.NewCircuitBreaker(cfg.MailSMTPCircuitFailures, cfg.MailSMTPCircuitCooldown) diff --git a/internal/mail/imap/attachments.go b/internal/mail/imap/attachments.go new file mode 100644 index 0000000..897a50c --- /dev/null +++ b/internal/mail/imap/attachments.go @@ -0,0 +1,159 @@ +package imap + +import ( + "bytes" + "encoding/base64" + "io" + "mime" + "mime/multipart" + "mime/quotedprintable" + "net/mail" + "strings" + + "github.com/ultisuite/ulti-backend/internal/mail/limits" +) + +// AttachmentPart is a decoded MIME body part stored as an attachment or inline resource. +type AttachmentPart struct { + Filename string + ContentType string + ContentID string + IsInline bool + Data []byte +} + +// ExtractAttachments parses raw RFC 822 message bytes and returns attachment parts. +// Body text/plain and text/html parts are skipped. Non-text parts are collected when +// Content-Disposition is attachment, or inline with a filename. Parts exceeding +// limits.MaxAttachmentBytes are skipped; collection stops at limits.MaxAttachmentsPerMessage. +func ExtractAttachments(raw []byte) ([]AttachmentPart, error) { + if len(raw) == 0 { + return nil, nil + } + + msg, err := mail.ReadMessage(bytes.NewReader(raw)) + if err != nil { + return nil, err + } + + contentType := msg.Header.Get("Content-Type") + if contentType == "" { + return nil, nil + } + + mediaType, params, err := mime.ParseMediaType(contentType) + if err != nil || !strings.HasPrefix(mediaType, "multipart/") { + return nil, nil + } + + return extractAttachmentsFromMultipart(msg.Body, params["boundary"]), nil +} + +func extractAttachmentsFromMultipart(r io.Reader, boundary string) []AttachmentPart { + var attachments []AttachmentPart + + mr := multipart.NewReader(r, boundary) + for { + part, err := mr.NextPart() + if err != nil { + break + } + if len(attachments) >= limits.MaxAttachmentsPerMessage { + break + } + + partType := part.Header.Get("Content-Type") + mediaType, params, _ := mime.ParseMediaType(partType) + if mediaType == "" { + mediaType = "application/octet-stream" + } + + switch { + case mediaType == "text/plain", mediaType == "text/html": + continue + case strings.HasPrefix(mediaType, "multipart/"): + nested := extractAttachmentsFromMultipart(part, params["boundary"]) + for _, att := range nested { + if len(attachments) >= limits.MaxAttachmentsPerMessage { + break + } + attachments = append(attachments, att) + } + default: + if att, ok := partToAttachment(part, mediaType, params); ok { + attachments = append(attachments, att) + } + } + } + + return attachments +} + +func partToAttachment(part *multipart.Part, mediaType string, typeParams map[string]string) (AttachmentPart, bool) { + if strings.HasPrefix(mediaType, "text/") { + return AttachmentPart{}, false + } + + disposition, dispParams, _ := mime.ParseMediaType(part.Header.Get("Content-Disposition")) + filename := dispParams["filename"] + if filename == "" { + filename = typeParams["name"] + } + + isInline := strings.EqualFold(disposition, "inline") + isAttachment := strings.EqualFold(disposition, "attachment") + + if !isAttachment && !(isInline && filename != "") { + return AttachmentPart{}, false + } + + data, err := io.ReadAll(part) + if err != nil { + return AttachmentPart{}, false + } + + data, err = decodePartBody(part.Header.Get("Content-Transfer-Encoding"), data) + if err != nil || len(data) > limits.MaxAttachmentBytes { + return AttachmentPart{}, false + } + + return AttachmentPart{ + Filename: filename, + ContentType: mediaType, + ContentID: normalizeContentID(part.Header.Get("Content-ID")), + IsInline: isInline, + Data: data, + }, true +} + +func normalizeContentID(raw string) string { + return strings.Trim(raw, "<> \t") +} + +func decodePartBody(transferEncoding string, data []byte) ([]byte, error) { + switch strings.ToLower(strings.TrimSpace(transferEncoding)) { + case "base64": + return decodeBase64Body(data) + case "quoted-printable": + return io.ReadAll(quotedprintable.NewReader(bytes.NewReader(data))) + default: + return data, nil + } +} + +func decodeBase64Body(data []byte) ([]byte, error) { + clean := bytes.Map(func(r rune) rune { + switch r { + case '\r', '\n', ' ', '\t': + return -1 + default: + return r + } + }, data) + decoded := make([]byte, base64.StdEncoding.DecodedLen(len(clean))) + n, err := base64.StdEncoding.Decode(decoded, clean) + if err != nil { + return nil, err + } + return decoded[:n], nil +} diff --git a/internal/mail/imap/attachments_test.go b/internal/mail/imap/attachments_test.go new file mode 100644 index 0000000..d326211 --- /dev/null +++ b/internal/mail/imap/attachments_test.go @@ -0,0 +1,157 @@ +package imap + +import ( + "encoding/base64" + "strings" + "testing" +) + +func TestExtractAttachments_plainAttachment(t *testing.T) { + pdfData := []byte("%PDF-1.4\n") + raw := buildMultipartMessage(t, "mixed", []mimePart{ + { + contentType: "text/plain", + body: []byte("Hello world"), + }, + { + contentType: "application/pdf; name=\"doc.pdf\"", + disposition: "attachment; filename=\"doc.pdf\"", + body: pdfData, + transferEnc: "base64", + }, + }) + + attachments, err := ExtractAttachments(raw) + if err != nil { + t.Fatalf("ExtractAttachments() error = %v", err) + } + if len(attachments) != 1 { + t.Fatalf("len(attachments) = %d, want 1", len(attachments)) + } + + att := attachments[0] + if att.Filename != "doc.pdf" { + t.Fatalf("Filename = %q, want doc.pdf", att.Filename) + } + if att.ContentType != "application/pdf" { + t.Fatalf("ContentType = %q, want application/pdf", att.ContentType) + } + if att.IsInline { + t.Fatal("IsInline = true, want false") + } + if att.ContentID != "" { + t.Fatalf("ContentID = %q, want empty", att.ContentID) + } + if string(att.Data) != string(pdfData) { + t.Fatalf("Data = %q, want %q", att.Data, pdfData) + } +} + +func TestExtractAttachments_inlineWithCID(t *testing.T) { + pngData := []byte{0x89, 'P', 'N', 'G', '\r', '\n', 0x1a, '\n'} + raw := buildMultipartMessage(t, "related", []mimePart{ + { + contentType: "text/html", + body: []byte(``), + }, + { + contentType: "image/png; name=\"logo.png\"", + disposition: "inline; filename=\"logo.png\"", + contentID: "", + body: pngData, + transferEnc: "base64", + }, + }) + + attachments, err := ExtractAttachments(raw) + if err != nil { + t.Fatalf("ExtractAttachments() error = %v", err) + } + if len(attachments) != 1 { + t.Fatalf("len(attachments) = %d, want 1", len(attachments)) + } + + att := attachments[0] + if att.Filename != "logo.png" { + t.Fatalf("Filename = %q, want logo.png", att.Filename) + } + if att.ContentType != "image/png" { + t.Fatalf("ContentType = %q, want image/png", att.ContentType) + } + if !att.IsInline { + t.Fatal("IsInline = false, want true") + } + if att.ContentID != "logo@cid" { + t.Fatalf("ContentID = %q, want logo@cid", att.ContentID) + } + if string(att.Data) != string(pngData) { + t.Fatalf("Data = %q, want %q", att.Data, pngData) + } +} + +func TestExtractAttachments_skipsBodyParts(t *testing.T) { + raw := buildMultipartMessage(t, "alternative", []mimePart{ + { + contentType: "text/plain", + body: []byte("plain body"), + }, + { + contentType: "text/html", + body: []byte("

html body

"), + }, + }) + + attachments, err := ExtractAttachments(raw) + if err != nil { + t.Fatalf("ExtractAttachments() error = %v", err) + } + if len(attachments) != 0 { + t.Fatalf("len(attachments) = %d, want 0", len(attachments)) + } +} + +type mimePart struct { + contentType string + disposition string + contentID string + transferEnc string + body []byte +} + +func buildMultipartMessage(t *testing.T, subtype string, parts []mimePart) []byte { + t.Helper() + + const boundary = "test-boundary" + var b strings.Builder + b.WriteString("From: sender@example.com\r\n") + b.WriteString("To: recipient@example.com\r\n") + b.WriteString("Subject: attachment test\r\n") + b.WriteString("MIME-Version: 1.0\r\n") + b.WriteString("Content-Type: multipart/" + subtype + "; boundary=\"" + boundary + "\"\r\n") + b.WriteString("\r\n") + + for _, part := range parts { + b.WriteString("--" + boundary + "\r\n") + b.WriteString("Content-Type: " + part.contentType + "\r\n") + if part.disposition != "" { + b.WriteString("Content-Disposition: " + part.disposition + "\r\n") + } + if part.contentID != "" { + b.WriteString("Content-ID: " + part.contentID + "\r\n") + } + if part.transferEnc != "" { + b.WriteString("Content-Transfer-Encoding: " + part.transferEnc + "\r\n") + } + b.WriteString("\r\n") + + if part.transferEnc == "base64" { + b.WriteString(base64.StdEncoding.EncodeToString(part.body)) + } else { + b.Write(part.body) + } + b.WriteString("\r\n") + } + b.WriteString("--" + boundary + "--\r\n") + + return []byte(b.String()) +} diff --git a/internal/mail/imap/folders.go b/internal/mail/imap/folders.go new file mode 100644 index 0000000..d9be5b3 --- /dev/null +++ b/internal/mail/imap/folders.go @@ -0,0 +1,110 @@ +package imap + +import ( + "strings" + + "github.com/emersion/go-imap/v2" +) + +var folderDisplayNames = map[string]string{ + "inbox": "Inbox", + "sent": "Sent", + "drafts": "Drafts", + "trash": "Trash", + "archive": "Archive", + "spam": "Spam", +} + +var folderNameHeuristics = map[string]string{ + "inbox": "inbox", + "sent": "sent", + "sent items": "sent", + "sent messages": "sent", + "[gmail]/sent mail": "sent", + "inbox.sent": "sent", + "drafts": "drafts", + "[gmail]/drafts": "drafts", + "trash": "trash", + "deleted": "trash", + "[gmail]/trash": "trash", + "archive": "archive", + "all mail": "archive", + "[gmail]/all mail": "archive", + "junk": "spam", + "spam": "spam", + "[gmail]/spam": "spam", +} + +// DetectFolderType classifies an IMAP mailbox for sync using SPECIAL-USE +// attributes when present, otherwise name heuristics. NoSelect mailboxes +// return "" so callers can skip sync. +func DetectFolderType(mailbox string, attrs []imap.MailboxAttr) string { + for _, attr := range attrs { + if attr == imap.MailboxAttrNoSelect { + return "" + } + } + + if folderType := folderTypeFromAttrs(attrs); folderType != "" { + return folderType + } + + return folderTypeFromName(mailbox) +} + +// DisplayName returns a user-facing folder label from the detected type, +// or the last path segment of mailbox when type is custom or unknown. +func DisplayName(mailbox, folderType string) string { + folderType = strings.ToLower(strings.TrimSpace(folderType)) + if name, ok := folderDisplayNames[folderType]; ok { + return name + } + return mailboxLeaf(mailbox) +} + +func folderTypeFromAttrs(attrs []imap.MailboxAttr) string { + for _, attr := range attrs { + switch attr { + case imap.MailboxAttrSent: + return "sent" + case imap.MailboxAttrDrafts: + return "drafts" + case imap.MailboxAttrTrash: + return "trash" + case imap.MailboxAttrArchive: + return "archive" + case imap.MailboxAttrJunk: + return "spam" + } + } + return "" +} + +func folderTypeFromName(mailbox string) string { + full := strings.ToLower(strings.TrimSpace(mailbox)) + if folderType, ok := folderNameHeuristics[full]; ok { + return folderType + } + + leaf := strings.ToLower(mailboxLeaf(mailbox)) + if folderType, ok := folderNameHeuristics[leaf]; ok { + return folderType + } + + return "custom" +} + +func mailboxLeaf(mailbox string) string { + mailbox = strings.TrimSpace(mailbox) + if mailbox == "" { + return "" + } + + leaf := mailbox + for _, sep := range []string{"/", "."} { + if idx := strings.LastIndex(leaf, sep); idx >= 0 && idx < len(leaf)-len(sep) { + leaf = leaf[idx+len(sep):] + } + } + return leaf +} diff --git a/internal/mail/imap/folders_test.go b/internal/mail/imap/folders_test.go new file mode 100644 index 0000000..d3b272c --- /dev/null +++ b/internal/mail/imap/folders_test.go @@ -0,0 +1,243 @@ +package imap + +import ( + "testing" + + "github.com/emersion/go-imap/v2" +) + +func TestDetectFolderType(t *testing.T) { + tests := []struct { + name string + mailbox string + attrs []imap.MailboxAttr + want string + }{ + { + name: "noselect returns empty", + mailbox: "Notes", + attrs: []imap.MailboxAttr{imap.MailboxAttrNoSelect}, + want: "", + }, + { + name: "noselect skipped even with sent name", + mailbox: "Sent", + attrs: []imap.MailboxAttr{imap.MailboxAttrNoSelect, imap.MailboxAttrSent}, + want: "", + }, + { + name: "attr sent", + mailbox: "Outbox", + attrs: []imap.MailboxAttr{imap.MailboxAttrSent}, + want: "sent", + }, + { + name: "attr drafts", + mailbox: "Brouillons", + attrs: []imap.MailboxAttr{imap.MailboxAttrDrafts}, + want: "drafts", + }, + { + name: "attr trash", + mailbox: "Corbeille", + attrs: []imap.MailboxAttr{imap.MailboxAttrTrash}, + want: "trash", + }, + { + name: "attr archive", + mailbox: "Old", + attrs: []imap.MailboxAttr{imap.MailboxAttrArchive}, + want: "archive", + }, + { + name: "attr junk", + mailbox: "Bulk", + attrs: []imap.MailboxAttr{imap.MailboxAttrJunk}, + want: "spam", + }, + { + name: "attrs beat heuristics", + mailbox: "INBOX", + attrs: []imap.MailboxAttr{imap.MailboxAttrSent}, + want: "sent", + }, + { + name: "inbox heuristic", + mailbox: "INBOX", + attrs: nil, + want: "inbox", + }, + { + name: "inbox lowercase", + mailbox: "inbox", + attrs: nil, + want: "inbox", + }, + { + name: "sent heuristic", + mailbox: "Sent", + attrs: nil, + want: "sent", + }, + { + name: "sent items heuristic", + mailbox: "Sent Items", + attrs: nil, + want: "sent", + }, + { + name: "sent messages heuristic", + mailbox: "Sent Messages", + attrs: nil, + want: "sent", + }, + { + name: "gmail sent heuristic", + mailbox: "[Gmail]/Sent Mail", + attrs: nil, + want: "sent", + }, + { + name: "inbox dot sent heuristic", + mailbox: "INBOX.Sent", + attrs: nil, + want: "sent", + }, + { + name: "drafts heuristic", + mailbox: "Drafts", + attrs: nil, + want: "drafts", + }, + { + name: "gmail drafts heuristic", + mailbox: "[Gmail]/Drafts", + attrs: nil, + want: "drafts", + }, + { + name: "trash heuristic", + mailbox: "Trash", + attrs: nil, + want: "trash", + }, + { + name: "deleted heuristic", + mailbox: "Deleted", + attrs: nil, + want: "trash", + }, + { + name: "gmail trash heuristic", + mailbox: "[Gmail]/Trash", + attrs: nil, + want: "trash", + }, + { + name: "archive heuristic", + mailbox: "Archive", + attrs: nil, + want: "archive", + }, + { + name: "all mail heuristic", + mailbox: "All Mail", + attrs: nil, + want: "archive", + }, + { + name: "gmail all mail heuristic", + mailbox: "[Gmail]/All Mail", + attrs: nil, + want: "archive", + }, + { + name: "junk heuristic", + mailbox: "Junk", + attrs: nil, + want: "spam", + }, + { + name: "spam heuristic", + mailbox: "Spam", + attrs: nil, + want: "spam", + }, + { + name: "gmail spam heuristic", + mailbox: "[Gmail]/Spam", + attrs: nil, + want: "spam", + }, + { + name: "custom folder", + mailbox: "Work/Projects", + attrs: nil, + want: "custom", + }, + { + name: "leaf sent in hierarchy", + mailbox: "Accounts/Sent", + attrs: nil, + want: "sent", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := DetectFolderType(tt.mailbox, tt.attrs) + if got != tt.want { + t.Fatalf("DetectFolderType(%q, %v) = %q, want %q", tt.mailbox, tt.attrs, got, tt.want) + } + }) + } +} + +func TestDisplayName(t *testing.T) { + tests := []struct { + name string + mailbox string + folderType string + want string + }{ + {name: "inbox type", mailbox: "INBOX", folderType: "inbox", want: "Inbox"}, + {name: "sent type", mailbox: "[Gmail]/Sent Mail", folderType: "sent", want: "Sent"}, + {name: "drafts type", mailbox: "Drafts", folderType: "drafts", want: "Drafts"}, + {name: "trash type", mailbox: "Trash", folderType: "trash", want: "Trash"}, + {name: "archive type", mailbox: "All Mail", folderType: "archive", want: "Archive"}, + {name: "spam type", mailbox: "Junk", folderType: "spam", want: "Spam"}, + { + name: "custom uses leaf", + mailbox: "Work/Projects", + folderType: "custom", + want: "Projects", + }, + { + name: "unknown type uses leaf", + mailbox: "Clients/ACME", + folderType: "", + want: "ACME", + }, + { + name: "dot hierarchy leaf", + mailbox: "INBOX.Work", + folderType: "custom", + want: "Work", + }, + { + name: "custom type casing ignored", + mailbox: "Reports/Q1", + folderType: "CUSTOM", + want: "Q1", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := DisplayName(tt.mailbox, tt.folderType) + if got != tt.want { + t.Fatalf("DisplayName(%q, %q) = %q, want %q", tt.mailbox, tt.folderType, got, tt.want) + } + }) + } +} diff --git a/internal/mail/imap/pipeline.go b/internal/mail/imap/pipeline.go new file mode 100644 index 0000000..ada0184 --- /dev/null +++ b/internal/mail/imap/pipeline.go @@ -0,0 +1,131 @@ +package imap + +import ( + "context" + "encoding/json" + "log/slog" + + "github.com/jackc/pgx/v5/pgxpool" + "github.com/ultisuite/ulti-backend/internal/mail/rules" + "github.com/ultisuite/ulti-backend/internal/realtime" +) + +type postSyncEvent struct { + userID string + accountID string + messageID string + kind string // created | updated | deleted +} + +// syncPipeline runs rules and realtime notifications after message sync. +type syncPipeline struct { + db *pgxpool.Pool + logger *slog.Logger + rules *rules.Engine + hub *realtime.Hub +} + +func newSyncPipeline(db *pgxpool.Pool, rulesEngine *rules.Engine, hub *realtime.Hub) *syncPipeline { + return &syncPipeline{ + db: db, + logger: slog.Default().With("component", "imap-pipeline"), + rules: rulesEngine, + hub: hub, + } +} + +func (p *syncPipeline) handle(ctx context.Context, ev postSyncEvent) { + if ev.kind == "deleted" { + p.broadcast(ev, realtime.Event{ + Type: "mail.deleted", + Payload: map[string]any{ + "message_id": ev.messageID, + "account_id": ev.accountID, + }, + }) + return + } + + if p.rules != nil && ev.kind == "created" { + msg, err := p.loadRuleMessage(ctx, ev.messageID) + if err != nil { + p.logger.Error("load message for rules", "message_id", ev.messageID, "error", err) + } else if err := p.rules.EvaluateMessage(ctx, ev.userID, msg); err != nil { + p.logger.Error("rules evaluation failed", "message_id", ev.messageID, "error", err) + } + } + + eventType := "mail.updated" + if ev.kind == "created" { + eventType = "mail.created" + } + p.broadcast(ev, realtime.Event{ + Type: eventType, + Payload: map[string]any{ + "message_id": ev.messageID, + "account_id": ev.accountID, + }, + }) +} + +func (p *syncPipeline) broadcast(ev postSyncEvent, event realtime.Event) { + if p.hub == nil || ev.userID == "" { + return + } + p.hub.Broadcast(ev.userID, event) +} + +func (p *syncPipeline) loadRuleMessage(ctx context.Context, messageID string) (*rules.Message, error) { + var ( + fromJSON []byte + toJSON []byte + subject string + bodyText string + hasAtt bool + ) + err := p.db.QueryRow(ctx, ` + SELECT from_addr, to_addrs, subject, body_text, has_attachments + FROM messages WHERE id = $1 + `, messageID).Scan(&fromJSON, &toJSON, &subject, &bodyText, &hasAtt) + if err != nil { + return nil, err + } + + from := firstAddressString(fromJSON) + to := addressListStrings(toJSON) + + return &rules.Message{ + ID: messageID, + From: from, + To: to, + Subject: subject, + BodyText: bodyText, + HasAttachments: hasAtt, + }, nil +} + +func firstAddressString(fromJSON []byte) string { + var addrs []EmailAddress + if err := json.Unmarshal(fromJSON, &addrs); err != nil || len(addrs) == 0 { + return string(fromJSON) + } + a := addrs[0] + if a.Name != "" { + return a.Name + " <" + a.Address + ">" + } + return a.Address +} + +func addressListStrings(toJSON []byte) []string { + var addrs []EmailAddress + if err := json.Unmarshal(toJSON, &addrs); err != nil { + return nil + } + out := make([]string, 0, len(addrs)) + for _, a := range addrs { + if a.Address != "" { + out = append(out, a.Address) + } + } + return out +} diff --git a/internal/mail/imap/sync.go b/internal/mail/imap/sync.go index 5fa091c..c742b69 100644 --- a/internal/mail/imap/sync.go +++ b/internal/mail/imap/sync.go @@ -1,6 +1,7 @@ package imap import ( + "bytes" "context" "errors" "fmt" @@ -13,23 +14,41 @@ import ( "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" "github.com/ultisuite/ulti-backend/internal/mail/credentials" + "github.com/ultisuite/ulti-backend/internal/mail/limits" + "github.com/ultisuite/ulti-backend/internal/mail/rules" + "github.com/ultisuite/ulti-backend/internal/mail/storage" "github.com/ultisuite/ulti-backend/internal/mail/threading" "github.com/ultisuite/ulti-backend/internal/observability" + "github.com/ultisuite/ulti-backend/internal/realtime" ) -type SyncWorker struct { - db *pgxpool.Pool - logger *slog.Logger - interval time.Duration - credentials *credentials.Manager +// SyncDeps optional services wired into the IMAP sync worker. +type SyncDeps struct { + Storage *storage.Client + AttachBucket string + Rules *rules.Engine + Hub *realtime.Hub } -func NewSyncWorker(db *pgxpool.Pool, interval time.Duration, credManager *credentials.Manager) *SyncWorker { +type SyncWorker struct { + db *pgxpool.Pool + logger *slog.Logger + interval time.Duration + credentials *credentials.Manager + storage *storage.Client + attachBucket string + pipeline *syncPipeline +} + +func NewSyncWorker(db *pgxpool.Pool, interval time.Duration, credManager *credentials.Manager, deps SyncDeps) *SyncWorker { return &SyncWorker{ - db: db, - logger: slog.Default().With("component", "imap-sync"), - interval: interval, - credentials: credManager, + db: db, + logger: slog.Default().With("component", "imap-sync"), + interval: interval, + credentials: credManager, + storage: deps.Storage, + attachBucket: deps.AttachBucket, + pipeline: newSyncPipeline(db, deps.Rules, deps.Hub), } } @@ -38,8 +57,6 @@ func (w *SyncWorker) Start(ctx context.Context) { defer ticker.Stop() w.logger.Info("imap sync worker started", "interval", w.interval) - - // Initial sync w.runSyncCycle(ctx) for { @@ -65,12 +82,11 @@ func (w *SyncWorker) runSyncCycle(ctx context.Context) { func (w *SyncWorker) syncAllAccounts(ctx context.Context) error { rows, err := w.db.Query(ctx, ` - SELECT id, imap_host, imap_port, imap_tls, credentials, sync_state + SELECT id, imap_host, imap_port, imap_tls, credentials FROM mail_accounts WHERE is_active = true `) if err != nil { - w.logger.Error("failed to query accounts", "error", err) return err } defer rows.Close() @@ -83,15 +99,13 @@ func (w *SyncWorker) syncAllAccounts(ctx context.Context) error { port int useTLS bool creds []byte - syncState []byte ) - if err := rows.Scan(&accountID, &host, &port, &useTLS, &creds, &syncState); err != nil { + if err := rows.Scan(&accountID, &host, &port, &useTLS, &creds); err != nil { w.logger.Error("failed to scan account", "error", err) hasSyncError = true continue } - - if err := w.syncAccount(ctx, accountID, host, port, useTLS, creds, syncState); err != nil { + if err := w.syncAccount(ctx, accountID, host, port, useTLS, creds); err != nil { w.logger.Error("sync failed", "account_id", accountID, "error", err) hasSyncError = true } @@ -105,13 +119,15 @@ func (w *SyncWorker) syncAllAccounts(ctx context.Context) error { return nil } -func (w *SyncWorker) syncAccount(ctx context.Context, accountID, host string, port int, useTLS bool, creds, syncState []byte) error { +func (w *SyncWorker) syncAccount(ctx context.Context, accountID, host string, port int, useTLS bool, creds []byte) error { + userID, err := w.accountUserID(ctx, accountID) + if err != nil { + return err + } + addr := fmt.Sprintf("%s:%d", host, port) - - var client *imapclient.Client - var err error - opts := &imapclient.Options{} + var client *imapclient.Client if useTLS { client, err = imapclient.DialTLS(addr, opts) } else { @@ -126,86 +142,199 @@ func (w *SyncWorker) syncAccount(ctx context.Context, accountID, host string, po if err != nil { return fmt.Errorf("decrypt credentials: %w", err) } - if err := client.Login(username, password).Wait(); err != nil { return fmt.Errorf("login: %w", err) } - // List mailboxes - mailboxes, err := client.List("", "*", nil).Collect() + listOpts := &imap.ListOptions{ReturnSpecialUse: true} + mailboxes, err := client.List("", "*", listOpts).Collect() if err != nil { return fmt.Errorf("list: %w", err) } for _, mbox := range mailboxes { - if err := w.syncFolder(ctx, client, accountID, mbox.Mailbox); err != nil { + folderType := DetectFolderType(mbox.Mailbox, mbox.Attrs) + if folderType == "" { + continue + } + if err := w.syncFolder(ctx, client, accountID, userID, mbox.Mailbox, folderType); err != nil { w.logger.Error("folder sync failed", "account_id", accountID, "folder", mbox.Mailbox, "error", err) } } - // Update last sync time _, err = w.db.Exec(ctx, `UPDATE mail_accounts SET last_sync_at = NOW() WHERE id = $1`, accountID) return err } -func (w *SyncWorker) syncFolder(ctx context.Context, client *imapclient.Client, accountID, folderName string) error { - selectData, err := client.Select(folderName, nil).Wait() +func (w *SyncWorker) accountUserID(ctx context.Context, accountID string) (string, error) { + var userID string + err := w.db.QueryRow(ctx, `SELECT user_id FROM mail_accounts WHERE id = $1`, accountID).Scan(&userID) + return userID, err +} + +func (w *SyncWorker) syncFolder(ctx context.Context, client *imapclient.Client, accountID, userID, folderName, folderType string) error { + selectData, err := client.Select(folderName, &imap.SelectOptions{CondStore: true}).Wait() if err != nil { return fmt.Errorf("select %s: %w", folderName, err) } - // Upsert folder record + displayName := DisplayName(folderName, folderType) + prevState, hasPrev, err := loadFolderSyncState(ctx, w.db, accountID, folderName) + if err != nil && !errors.Is(err, pgx.ErrNoRows) { + return fmt.Errorf("load folder state: %w", err) + } + var folderID string + if hasPrev && prevState.UIDValidity != 0 && prevState.UIDValidity != selectData.UIDValidity { + if err := resetFolderMessages(ctx, w.db, prevState.FolderID); err != nil { + return fmt.Errorf("reset folder after uidvalidity change: %w", err) + } + prevState.LastUID = 0 + prevState.HighestModSeq = 0 + } + err = w.db.QueryRow(ctx, ` - INSERT INTO mail_folders (account_id, name, remote_name, uidvalidity, message_count) - VALUES ($1, $2, $2, $3, $4) + INSERT INTO mail_folders (account_id, name, remote_name, folder_type, uidvalidity, highest_modseq, message_count, last_uid) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT (account_id, remote_name) DO UPDATE - SET uidvalidity = EXCLUDED.uidvalidity, + SET name = EXCLUDED.name, + folder_type = EXCLUDED.folder_type, + uidvalidity = EXCLUDED.uidvalidity, + highest_modseq = GREATEST(mail_folders.highest_modseq, EXCLUDED.highest_modseq), message_count = EXCLUDED.message_count, updated_at = NOW() - RETURNING id - `, accountID, folderName, selectData.UIDValidity, selectData.NumMessages).Scan(&folderID) + RETURNING id, last_uid, highest_modseq + `, accountID, displayName, folderName, folderType, + selectData.UIDValidity, selectData.HighestModSeq, selectData.NumMessages, prevState.LastUID, + ).Scan(&folderID, &prevState.LastUID, &prevState.HighestModSeq) if err != nil { return fmt.Errorf("upsert folder: %w", err) } + prevState.FolderID = folderID + prevState.UIDValidity = selectData.UIDValidity if selectData.NumMessages == 0 { - return nil + return saveFolderSyncState(ctx, w.db, folderID, selectData.UIDValidity, selectData.HighestModSeq, 0, 0) } - // Get highest UID we already have for this folder - var lastUID uint32 - _ = w.db.QueryRow(ctx, ` - SELECT COALESCE(MAX(uid), 0) FROM messages WHERE folder_id = $1 - `, folderID).Scan(&lastUID) + lastUID := prevState.LastUID + if lastUID > 0 { + if err := w.fetchAndProcess(ctx, client, accountID, userID, folderID, lastUID+1, 0, false); err != nil { + return err + } + } else { + if err := w.fetchAndProcess(ctx, client, accountID, userID, folderID, 1, 0, false); err != nil { + return err + } + } - // Fetch messages newer than our last UID + if selectData.HighestModSeq > 0 && prevState.HighestModSeq > 0 && selectData.HighestModSeq > prevState.HighestModSeq { + if err := w.fetchAndProcess(ctx, client, accountID, userID, folderID, 1, prevState.HighestModSeq, true); err != nil { + w.logger.Warn("condstore incremental fetch failed", "folder", folderName, "error", err) + } + } + + if err := w.reconcileDeletions(ctx, client, folderID, userID, accountID); err != nil { + w.logger.Warn("deletion reconcile failed", "folder", folderName, "error", err) + } + + var maxUID uint32 + _ = w.db.QueryRow(ctx, `SELECT COALESCE(MAX(uid), 0) FROM messages WHERE folder_id = $1`, folderID).Scan(&maxUID) + + return saveFolderSyncState(ctx, w.db, folderID, selectData.UIDValidity, selectData.HighestModSeq, maxUID, int(selectData.NumMessages)) +} + +func (w *SyncWorker) fetchAndProcess(ctx context.Context, client *imapclient.Client, accountID, userID, folderID string, fromUID uint32, changedSince uint64, updatesOnly bool) error { seqSet := imap.UIDSet{} - seqSet.AddRange(imap.UID(lastUID+1), imap.UID(0)) // lastUID+1 to * + seqSet.AddRange(imap.UID(fromUID), imap.UID(0)) fetchOpts := &imap.FetchOptions{ UID: true, Flags: true, Envelope: true, + ModSeq: changedSince > 0, BodySection: []*imap.FetchItemBodySection{{}}, } + if changedSince > 0 { + fetchOpts.ChangedSince = changedSince + } fetchCmd := client.Fetch(seqSet, fetchOpts) - for { msg := fetchCmd.Next() if msg == nil { break } - if err := w.processMessage(ctx, msg, accountID, folderID); err != nil { - w.logger.Error("process message failed", "folder", folderName, "error", err) + kind, messageID, err := w.processMessage(ctx, msg, accountID, userID, folderID, updatesOnly) + if err != nil { + w.logger.Error("process message failed", "folder_id", folderID, "error", err) + continue + } + if kind != "" && w.pipeline != nil { + w.pipeline.handle(ctx, postSyncEvent{ + userID: userID, accountID: accountID, messageID: messageID, kind: kind, + }) } } - return fetchCmd.Close() } -func (w *SyncWorker) processMessage(ctx context.Context, msg *imapclient.FetchMessageData, accountID, folderID string) error { +func (w *SyncWorker) reconcileDeletions(ctx context.Context, client *imapclient.Client, folderID, userID, accountID string) error { + searchData, err := client.UIDSearch(&imap.SearchCriteria{}, &imap.SearchOptions{ReturnAll: true}).Wait() + if err != nil { + return err + } + remoteUIDs := uidSetToMap(searchData.All) + if len(remoteUIDs) == 0 && searchData.Count == 0 { + return nil + } + + rows, err := w.db.Query(ctx, `SELECT id, uid FROM messages WHERE folder_id = $1`, folderID) + if err != nil { + return err + } + defer rows.Close() + + for rows.Next() { + var messageID string + var uid uint32 + if err := rows.Scan(&messageID, &uid); err != nil { + return err + } + if !remoteUIDs[uid] { + if _, err := w.db.Exec(ctx, `DELETE FROM messages WHERE id = $1`, messageID); err != nil { + return err + } + if w.pipeline != nil { + w.pipeline.handle(ctx, postSyncEvent{ + userID: userID, accountID: accountID, messageID: messageID, kind: "deleted", + }) + } + } + } + return rows.Err() +} + +func uidSetToMap(set imap.NumSet) map[uint32]bool { + out := make(map[uint32]bool) + if set == nil { + return out + } + uidSet, ok := set.(imap.UIDSet) + if !ok { + return out + } + uids, ok := uidSet.Nums() + if !ok { + return out + } + for _, uid := range uids { + out[uint32(uid)] = true + } + return out +} + +func (w *SyncWorker) processMessage(ctx context.Context, msg *imapclient.FetchMessageData, accountID, userID, folderID string, updatesOnly bool) (kind, messageID string, err error) { var envelope *imap.Envelope var uid imap.UID var flags []imap.Flag @@ -240,19 +369,14 @@ func (w *SyncWorker) processMessage(ctx context.Context, msg *imapclient.FetchMe } } - if envelope == nil { - return nil - } - - flagStrs := make([]string, len(flags)) - for i, f := range flags { - flagStrs[i] = string(f) + if envelope == nil || uid == 0 { + return "", "", nil } + flagStrs := flagsToStrings(flags) fromAddr := addressesToJSON(envelope.From) toAddrs := addressesToJSON(envelope.To) ccAddrs := addressesToJSON(envelope.Cc) - bodyText, bodyHTML := parseBody(bodyContent) snippet := truncate(bodyText, 200) @@ -266,29 +390,110 @@ func (w *SyncWorker) processMessage(ctx context.Context, msg *imapclient.FetchMe references = threading.ParseMessageIDs(strings.Join(envelope.InReplyTo, " ")) } - var rowID string - err := w.db.QueryRow(ctx, ` + var existed bool + _ = w.db.QueryRow(ctx, ` + SELECT EXISTS(SELECT 1 FROM messages WHERE folder_id = $1 AND uid = $2) + `, folderID, uid).Scan(&existed) + + err = w.db.QueryRow(ctx, ` INSERT INTO messages (account_id, folder_id, uid, message_id, subject, from_addr, to_addrs, cc_addrs, date, snippet, body_text, body_html, flags, in_reply_to, references_header) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15) - ON CONFLICT (folder_id, uid) DO NOTHING + ON CONFLICT (folder_id, uid) DO UPDATE SET + message_id = EXCLUDED.message_id, + subject = EXCLUDED.subject, + from_addr = EXCLUDED.from_addr, + to_addrs = EXCLUDED.to_addrs, + cc_addrs = EXCLUDED.cc_addrs, + date = EXCLUDED.date, + snippet = EXCLUDED.snippet, + body_text = EXCLUDED.body_text, + body_html = EXCLUDED.body_html, + flags = EXCLUDED.flags, + in_reply_to = EXCLUDED.in_reply_to, + references_header = EXCLUDED.references_header, + updated_at = NOW() RETURNING id `, accountID, folderID, uid, envelope.MessageID, envelope.Subject, - fromAddr, toAddrs, ccAddrs, envelope.Date, snippet, bodyText, bodyHTML, flagStrs, inReplyTo, references).Scan(&rowID) + fromAddr, toAddrs, ccAddrs, envelope.Date, snippet, bodyText, bodyHTML, flagStrs, inReplyTo, references, + ).Scan(&messageID) if err != nil { - if errors.Is(err, pgx.ErrNoRows) { - return nil - } - return err + return "", "", err } threadID, err := threading.AssignThreadID(ctx, w.db, accountID, inReplyTo, references) if err != nil { + return "", "", err + } + _, err = w.db.Exec(ctx, `UPDATE messages SET thread_id = $1, updated_at = NOW() WHERE id = $2`, threadID, messageID) + if err != nil { + return "", "", err + } + + if err := w.storeAttachments(ctx, userID, messageID, bodyContent, existed); err != nil { + w.logger.Warn("attachment store failed", "message_id", messageID, "error", err) + } + + if existed { + return "updated", messageID, nil + } + if updatesOnly { + return "updated", messageID, nil + } + return "created", messageID, nil +} + +func (w *SyncWorker) storeAttachments(ctx context.Context, userID, messageID string, raw []byte, messageExisted bool) error { + if w.storage == nil || len(raw) == 0 { + return nil + } + if messageExisted { + var attCount int + _ = w.db.QueryRow(ctx, `SELECT COUNT(*) FROM attachments WHERE message_id = $1`, messageID).Scan(&attCount) + if attCount > 0 { + return nil + } + } + + parts, err := ExtractAttachments(raw) + if err != nil || len(parts) == 0 { return err } - _, err = w.db.Exec(ctx, `UPDATE messages SET thread_id = $1, updated_at = NOW() WHERE id = $2`, threadID, rowID) + + bucket := w.attachBucket + if bucket == "" { + bucket = "mail-attachments" + } + + for _, part := range parts { + if err := limits.ValidateAttachmentSize(int64(len(part.Data))); err != nil { + continue + } + objectKey := storage.MessageObjectKey(userID, messageID, part.Filename) + if err := w.storage.Put(ctx, objectKey, bytes.NewReader(part.Data), int64(len(part.Data)), part.ContentType); err != nil { + return err + } + _, err := w.db.Exec(ctx, ` + INSERT INTO attachments (message_id, filename, content_type, size, s3_bucket, s3_key, content_id, is_inline) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + `, messageID, part.Filename, part.ContentType, len(part.Data), bucket, objectKey, part.ContentID, part.IsInline) + if err != nil { + _ = w.storage.Delete(ctx, objectKey) + return err + } + } + + _, err = w.db.Exec(ctx, `UPDATE messages SET has_attachments = true, updated_at = NOW() WHERE id = $1`, messageID) return err } +func flagsToStrings(flags []imap.Flag) []string { + out := make([]string, len(flags)) + for i, f := range flags { + out[i] = string(f) + } + return out +} + func (w *SyncWorker) parseCredentials(creds []byte) (string, string, error) { if len(creds) == 0 { return "", "", errors.New("missing credentials") @@ -302,19 +507,6 @@ func (w *SyncWorker) parseCredentials(creds []byte) (string, string, error) { return w.credentials.Decrypt(creds) } -func splitBytes(data []byte, sep byte) [][]byte { - var parts [][]byte - start := 0 - for i, b := range data { - if b == sep { - parts = append(parts, data[start:i]) - start = i + 1 - } - } - parts = append(parts, data[start:]) - return parts -} - func truncate(s string, maxLen int) string { if len(s) <= maxLen { return s diff --git a/internal/mail/imap/sync_state.go b/internal/mail/imap/sync_state.go new file mode 100644 index 0000000..ecffe26 --- /dev/null +++ b/internal/mail/imap/sync_state.go @@ -0,0 +1,45 @@ +package imap + +import ( + "context" + + "github.com/jackc/pgx/v5/pgxpool" +) + +type folderSyncState struct { + FolderID string + UIDValidity uint32 + HighestModSeq uint64 + LastUID uint32 +} + +func loadFolderSyncState(ctx context.Context, db *pgxpool.Pool, accountID, remoteName string) (folderSyncState, bool, error) { + var state folderSyncState + err := db.QueryRow(ctx, ` + SELECT id, uidvalidity, highest_modseq, last_uid + FROM mail_folders + WHERE account_id = $1 AND remote_name = $2 + `, accountID, remoteName).Scan(&state.FolderID, &state.UIDValidity, &state.HighestModSeq, &state.LastUID) + if err != nil { + return folderSyncState{}, false, err + } + return state, true, nil +} + +func saveFolderSyncState(ctx context.Context, db *pgxpool.Pool, folderID string, uidValidity uint32, highestModSeq uint64, lastUID uint32, messageCount int) error { + _, err := db.Exec(ctx, ` + UPDATE mail_folders + SET uidvalidity = $2, + highest_modseq = $3, + last_uid = $4, + message_count = $5, + updated_at = NOW() + WHERE id = $1 + `, folderID, uidValidity, highestModSeq, lastUID, messageCount) + return err +} + +func resetFolderMessages(ctx context.Context, db *pgxpool.Pool, folderID string) error { + _, err := db.Exec(ctx, `DELETE FROM messages WHERE folder_id = $1`, folderID) + return err +} diff --git a/internal/mail/rules/engine.go b/internal/mail/rules/engine.go index 6a90d1c..f59d3b0 100644 --- a/internal/mail/rules/engine.go +++ b/internal/mail/rules/engine.go @@ -8,11 +8,17 @@ import ( "strings" "github.com/jackc/pgx/v5/pgxpool" + "github.com/ultisuite/ulti-backend/internal/mail/webhooks" ) +type WebhookExecutor interface { + Execute(ctx context.Context, templateID string, msgCtx *webhooks.MessageContext) error +} + type Engine struct { - db *pgxpool.Pool - logger *slog.Logger + db *pgxpool.Pool + logger *slog.Logger + webhookExec WebhookExecutor } func NewEngine(db *pgxpool.Pool) *Engine { @@ -22,6 +28,16 @@ func NewEngine(db *pgxpool.Pool) *Engine { } } +func NewEngineWithWebhooks(db *pgxpool.Pool, webhookExec WebhookExecutor) *Engine { + e := NewEngine(db) + e.webhookExec = webhookExec + return e +} + +func (e *Engine) SetWebhookExecutor(webhookExec WebhookExecutor) { + e.webhookExec = webhookExec +} + type Rule struct { ID string `json:"id"` Name string `json:"name"` @@ -41,6 +57,13 @@ type Action struct { Value string `json:"value"` // label name, folder name, email, webhook_id } +type ActionResult struct { + Type string `json:"type"` + Value string `json:"value"` + OK bool `json:"ok"` + Error string `json:"error"` +} + type Message struct { ID string `json:"id"` From string `json:"from"` @@ -51,6 +74,10 @@ type Message struct { } func (e *Engine) Evaluate(ctx context.Context, userID string, msg *Message) error { + return e.EvaluateMessage(ctx, userID, msg) +} + +func (e *Engine) EvaluateMessage(ctx context.Context, userID string, msg *Message) error { rows, err := e.db.Query(ctx, ` SELECT id, name, conditions, actions FROM mail_rules @@ -81,12 +108,10 @@ func (e *Engine) Evaluate(ctx context.Context, userID string, msg *Message) erro if matchesAll(conditions, msg) { e.logger.Info("rule matched", "rule_id", ruleID, "rule_name", name, "message_id", msg.ID) - for _, action := range actions { - if err := e.executeAction(ctx, action, msg); err != nil { - e.logger.Error("action failed", "rule_id", ruleID, "action", action.Type, "error", err) - } + results := e.executeRuleActions(ctx, ruleID, actions, msg) + if err := e.recordRuleExecution(ctx, ruleID, msg.ID, results); err != nil { + e.logger.Error("record rule execution", "rule_id", ruleID, "message_id", msg.ID, "error", err) } - // Increment match count e.db.Exec(ctx, `UPDATE mail_rules SET match_count = match_count + 1 WHERE id = $1`, ruleID) } } @@ -94,6 +119,58 @@ func (e *Engine) Evaluate(ctx context.Context, userID string, msg *Message) erro return nil } +func (e *Engine) executeRuleActions(ctx context.Context, ruleID string, actions []Action, msg *Message) []ActionResult { + results := make([]ActionResult, 0, len(actions)) + for _, action := range actions { + err := e.executeAction(ctx, action, msg) + results = append(results, actionResultFrom(action, err)) + if err != nil { + e.logger.Error("action failed", "rule_id", ruleID, "action", action.Type, "error", err) + } + } + return results +} + +func actionResultFrom(action Action, err error) ActionResult { + result := ActionResult{ + Type: action.Type, + Value: action.Value, + OK: err == nil, + } + if err != nil { + result.Error = err.Error() + } + return result +} + +func (e *Engine) recordRuleExecution(ctx context.Context, ruleID, messageID string, results []ActionResult) error { + actionsJSON, err := json.Marshal(results) + if err != nil { + return fmt.Errorf("marshal actions_applied: %w", err) + } + + execError := aggregateActionErrors(results) + + _, err = e.db.Exec(ctx, ` + INSERT INTO rule_executions (rule_id, message_id, actions_applied, error) + VALUES ($1, $2, $3, $4) + `, ruleID, messageID, actionsJSON, execError) + if err != nil { + return fmt.Errorf("insert rule_executions: %w", err) + } + return nil +} + +func aggregateActionErrors(results []ActionResult) string { + var parts []string + for _, r := range results { + if !r.OK && r.Error != "" { + parts = append(parts, fmt.Sprintf("%s: %s", r.Type, r.Error)) + } + } + return strings.Join(parts, "; ") +} + func matchesAll(conditions []Condition, msg *Message) bool { for _, cond := range conditions { if !matchCondition(cond, msg) { @@ -143,6 +220,36 @@ func matchCondition(cond Condition, msg *Message) bool { } } +func messageToWebhookContext(msg *Message) *webhooks.MessageContext { + senderName, senderEmail := parseFromAddress(msg.From) + return &webhooks.MessageContext{ + SenderName: senderName, + SenderEmail: senderEmail, + Subject: msg.Subject, + BodyText: msg.BodyText, + Recipients: strings.Join(msg.To, ", "), + HasAttachment: msg.HasAttachments, + MessageID: msg.ID, + } +} + +func parseFromAddress(from string) (name, email string) { + from = strings.TrimSpace(from) + if from == "" { + return "", "" + } + if i := strings.LastIndex(from, "<"); i >= 0 { + j := strings.LastIndex(from, ">") + if j > i { + email = strings.TrimSpace(from[i+1 : j]) + name = strings.TrimSpace(from[:i]) + name = strings.Trim(name, `"`) + return name, email + } + } + return "", from +} + func (e *Engine) executeAction(ctx context.Context, action Action, msg *Message) error { switch action.Type { case "label": @@ -180,8 +287,10 @@ func (e *Engine) executeAction(ctx context.Context, action Action, msg *Message) `, msg.ID) return err case "webhook": - // Webhook execution is handled by the webhooks package - return nil + if e.webhookExec == nil { + return fmt.Errorf("webhook executor not configured") + } + return e.webhookExec.Execute(ctx, action.Value, messageToWebhookContext(msg)) default: return fmt.Errorf("unknown action type: %s", action.Type) } diff --git a/internal/mail/rules/engine_test.go b/internal/mail/rules/engine_test.go index 72c9307..8075f9c 100644 --- a/internal/mail/rules/engine_test.go +++ b/internal/mail/rules/engine_test.go @@ -2,8 +2,11 @@ package rules import ( "context" + "encoding/json" "strings" "testing" + + "github.com/ultisuite/ulti-backend/internal/mail/webhooks" ) func testMessage() *Message { @@ -86,3 +89,124 @@ func TestExecuteAction_unknownType(t *testing.T) { t.Fatalf("executeAction() error = %v, want unknown action type", err) } } + +func TestActionResultJSON(t *testing.T) { + results := []ActionResult{ + {Type: "label", Value: "important", OK: true}, + {Type: "webhook", Value: "tpl-1", OK: false, Error: "request failed: timeout"}, + } + + data, err := json.Marshal(results) + if err != nil { + t.Fatalf("json.Marshal() error = %v", err) + } + + var decoded []ActionResult + if err := json.Unmarshal(data, &decoded); err != nil { + t.Fatalf("json.Unmarshal() error = %v", err) + } + if len(decoded) != 2 { + t.Fatalf("len(decoded) = %d, want 2", len(decoded)) + } + if decoded[0].Type != "label" || !decoded[0].OK || decoded[0].Error != "" { + t.Fatalf("decoded[0] = %+v, want label ok=true empty error", decoded[0]) + } + if decoded[1].Type != "webhook" || decoded[1].OK || decoded[1].Error == "" { + t.Fatalf("decoded[1] = %+v, want webhook ok=false with error", decoded[1]) + } +} + +func TestActionResultFrom(t *testing.T) { + ok := actionResultFrom(Action{Type: "archive", Value: ""}, nil) + if !ok.OK || ok.Error != "" { + t.Fatalf("actionResultFrom(nil err) = %+v, want ok=true", ok) + } + + fail := actionResultFrom(Action{Type: "webhook", Value: "tpl-1"}, context.DeadlineExceeded) + if fail.OK || fail.Error == "" { + t.Fatalf("actionResultFrom(err) = %+v, want ok=false with error", fail) + } +} + +func TestAggregateActionErrors(t *testing.T) { + got := aggregateActionErrors([]ActionResult{ + {Type: "label", OK: true}, + {Type: "webhook", OK: false, Error: "timeout"}, + {Type: "move", OK: false, Error: "folder missing"}, + }) + want := "webhook: timeout; move: folder missing" + if got != want { + t.Fatalf("aggregateActionErrors() = %q, want %q", got, want) + } + if aggregateActionErrors(nil) != "" { + t.Fatal("aggregateActionErrors(nil) want empty string") + } +} + +func TestParseFromAddress(t *testing.T) { + tests := []struct { + from, wantName, wantEmail string + }{ + {"Alice ", "Alice", "alice@example.com"}, + {`"Bob Smith" `, "Bob Smith", "bob@example.com"}, + {"carol@example.com", "", "carol@example.com"}, + {"", "", ""}, + } + for _, tt := range tests { + name, email := parseFromAddress(tt.from) + if name != tt.wantName || email != tt.wantEmail { + t.Fatalf("parseFromAddress(%q) = (%q, %q), want (%q, %q)", tt.from, name, email, tt.wantName, tt.wantEmail) + } + } +} + +func TestMessageToWebhookContext(t *testing.T) { + msg := testMessage() + ctx := messageToWebhookContext(msg) + if ctx.SenderName != "Alice" || ctx.SenderEmail != "alice@example.com" { + t.Fatalf("sender = (%q, %q), want (Alice, alice@example.com)", ctx.SenderName, ctx.SenderEmail) + } + if ctx.Subject != msg.Subject || ctx.BodyText != msg.BodyText { + t.Fatal("subject/body not copied") + } + if ctx.Recipients != "bob@example.com, carol@example.com" { + t.Fatalf("Recipients = %q", ctx.Recipients) + } + if !ctx.HasAttachment || ctx.MessageID != msg.ID { + t.Fatalf("HasAttachment/MessageID = (%v, %q)", ctx.HasAttachment, ctx.MessageID) + } +} + +type mockWebhookExecutor struct { + templateID string + msgCtx *webhooks.MessageContext + err error +} + +func (m *mockWebhookExecutor) Execute(_ context.Context, templateID string, msgCtx *webhooks.MessageContext) error { + m.templateID = templateID + m.msgCtx = msgCtx + return m.err +} + +func TestExecuteAction_webhook(t *testing.T) { + msg := testMessage() + mock := &mockWebhookExecutor{} + e := &Engine{webhookExec: mock} + + if err := e.executeAction(context.Background(), Action{Type: "webhook", Value: "tpl-abc"}, msg); err != nil { + t.Fatalf("executeAction() error = %v", err) + } + if mock.templateID != "tpl-abc" { + t.Fatalf("templateID = %q, want tpl-abc", mock.templateID) + } + if mock.msgCtx.MessageID != msg.ID { + t.Fatalf("msgCtx.MessageID = %q, want %q", mock.msgCtx.MessageID, msg.ID) + } + + e.webhookExec = nil + err := e.executeAction(context.Background(), Action{Type: "webhook", Value: "tpl-abc"}, msg) + if err == nil || !strings.Contains(err.Error(), "webhook executor not configured") { + t.Fatalf("executeAction() without executor = %v, want not configured error", err) + } +} diff --git a/migrations/000009_imap_sync_pipeline.down.sql b/migrations/000009_imap_sync_pipeline.down.sql new file mode 100644 index 0000000..43a9dba --- /dev/null +++ b/migrations/000009_imap_sync_pipeline.down.sql @@ -0,0 +1,3 @@ +DROP TABLE IF EXISTS rule_executions; + +ALTER TABLE mail_folders DROP COLUMN IF EXISTS last_uid; diff --git a/migrations/000009_imap_sync_pipeline.up.sql b/migrations/000009_imap_sync_pipeline.up.sql new file mode 100644 index 0000000..938c121 --- /dev/null +++ b/migrations/000009_imap_sync_pipeline.up.sql @@ -0,0 +1,14 @@ +ALTER TABLE mail_folders ADD COLUMN last_uid BIGINT NOT NULL DEFAULT 0; + +CREATE TABLE rule_executions ( + id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), + rule_id UUID NOT NULL REFERENCES mail_rules(id) ON DELETE CASCADE, + message_id UUID NOT NULL REFERENCES messages(id) ON DELETE CASCADE, + actions_applied JSONB NOT NULL DEFAULT '[]', + error TEXT NOT NULL DEFAULT '', + executed_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX idx_rule_executions_rule_id ON rule_executions(rule_id); +CREATE INDEX idx_rule_executions_message_id ON rule_executions(message_id); +CREATE INDEX idx_rule_executions_executed_at ON rule_executions(executed_at); diff --git a/project-plan/checklist-execution.md b/project-plan/checklist-execution.md index e7eadd2..7ae8c57 100644 --- a/project-plan/checklist-execution.md +++ b/project-plan/checklist-execution.md @@ -93,13 +93,13 @@ Objectif: transformer état actuel (partiellement implémenté) vers produit fon ### 2.2 Sync IMAP & pipeline mail -- [ ] Persister état sync incrémental fiable (UIDVALIDITY, MODSEQ, last seen UID). -- [ ] Support suppressions/updates IMAP, pas seulement insert nouveaux messages. -- [ ] Gérer dossiers spéciaux (Sent/Drafts/Trash/Archive/Spam) cross-provider. -- [ ] Extraire et stocker attachments vers object storage. -- [ ] Lancer rules engine à réception et tracer résultats. -- [ ] Déclencher webhooks selon règles matchées. -- [ ] Publier events realtime WS après sync. +- [x] Persister état sync incrémental fiable (UIDVALIDITY, MODSEQ, last seen UID). +- [x] Support suppressions/updates IMAP, pas seulement insert nouveaux messages. +- [x] Gérer dossiers spéciaux (Sent/Drafts/Trash/Archive/Spam) cross-provider. +- [x] Extraire et stocker attachments vers object storage. +- [x] Lancer rules engine à réception et tracer résultats. +- [x] Déclencher webhooks selon règles matchées. +- [x] Publier events realtime WS après sync. ### 2.3 SMTP / Outbox / Scheduling @@ -111,8 +111,8 @@ Objectif: transformer état actuel (partiellement implémenté) vers produit fon ### 2.4 Rules & Webhooks -- [ ] Câbler réellement `rules.Engine` dans pipeline réception. -- [ ] Câbler réellement `webhooks.Executor` depuis actions de règles. +- [x] Câbler réellement `rules.Engine` dans pipeline réception. +- [x] Câbler réellement `webhooks.Executor` depuis actions de règles. - [ ] Ajouter simulation/test endpoint "run rule on sample message". - [ ] Ajouter templates webhook versionnés + preview rendu variables. - [ ] Ajouter signatures webhook (HMAC) + retry + backoff + DLQ.