ultisuite-backend/internal/mail/imap/pipeline.go
R3D347HR4Y bb5be669c1 Implement IMAP sync pipeline with rules and webhook support
- Introduced a new sync pipeline for IMAP that integrates a rules engine and webhook execution.
- Enhanced the `SyncWorker` to support attachment management and folder synchronization.
- Added functionality to detect special folder types (Sent, Drafts, Trash, Archive, Spam) during sync.
- Implemented a database schema for tracking rule executions and their outcomes.
- Created unit tests for the new rules engine and webhook execution logic.
- Updated migration scripts to accommodate new database structures for rule executions and folder states.
- Enhanced error handling and logging throughout the sync process for better observability.
2026-05-22 17:38:39 +02:00

132 lines
3.0 KiB
Go

package imap
import (
"context"
"encoding/json"
"log/slog"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/ultisuite/ulti-backend/internal/mail/rules"
"github.com/ultisuite/ulti-backend/internal/realtime"
)
type postSyncEvent struct {
userID string
accountID string
messageID string
kind string // created | updated | deleted
}
// syncPipeline runs rules and realtime notifications after message sync.
type syncPipeline struct {
db *pgxpool.Pool
logger *slog.Logger
rules *rules.Engine
hub *realtime.Hub
}
func newSyncPipeline(db *pgxpool.Pool, rulesEngine *rules.Engine, hub *realtime.Hub) *syncPipeline {
return &syncPipeline{
db: db,
logger: slog.Default().With("component", "imap-pipeline"),
rules: rulesEngine,
hub: hub,
}
}
func (p *syncPipeline) handle(ctx context.Context, ev postSyncEvent) {
if ev.kind == "deleted" {
p.broadcast(ev, realtime.Event{
Type: "mail.deleted",
Payload: map[string]any{
"message_id": ev.messageID,
"account_id": ev.accountID,
},
})
return
}
if p.rules != nil && ev.kind == "created" {
msg, err := p.loadRuleMessage(ctx, ev.messageID)
if err != nil {
p.logger.Error("load message for rules", "message_id", ev.messageID, "error", err)
} else if err := p.rules.EvaluateMessage(ctx, ev.userID, msg); err != nil {
p.logger.Error("rules evaluation failed", "message_id", ev.messageID, "error", err)
}
}
eventType := "mail.updated"
if ev.kind == "created" {
eventType = "mail.created"
}
p.broadcast(ev, realtime.Event{
Type: eventType,
Payload: map[string]any{
"message_id": ev.messageID,
"account_id": ev.accountID,
},
})
}
func (p *syncPipeline) broadcast(ev postSyncEvent, event realtime.Event) {
if p.hub == nil || ev.userID == "" {
return
}
p.hub.Broadcast(ev.userID, event)
}
func (p *syncPipeline) loadRuleMessage(ctx context.Context, messageID string) (*rules.Message, error) {
var (
fromJSON []byte
toJSON []byte
subject string
bodyText string
hasAtt bool
)
err := p.db.QueryRow(ctx, `
SELECT from_addr, to_addrs, subject, body_text, has_attachments
FROM messages WHERE id = $1
`, messageID).Scan(&fromJSON, &toJSON, &subject, &bodyText, &hasAtt)
if err != nil {
return nil, err
}
from := firstAddressString(fromJSON)
to := addressListStrings(toJSON)
return &rules.Message{
ID: messageID,
From: from,
To: to,
Subject: subject,
BodyText: bodyText,
HasAttachments: hasAtt,
}, nil
}
func firstAddressString(fromJSON []byte) string {
var addrs []EmailAddress
if err := json.Unmarshal(fromJSON, &addrs); err != nil || len(addrs) == 0 {
return string(fromJSON)
}
a := addrs[0]
if a.Name != "" {
return a.Name + " <" + a.Address + ">"
}
return a.Address
}
func addressListStrings(toJSON []byte) []string {
var addrs []EmailAddress
if err := json.Unmarshal(toJSON, &addrs); err != nil {
return nil
}
out := make([]string, 0, len(addrs))
for _, a := range addrs {
if a.Address != "" {
out = append(out, a.Address)
}
}
return out
}