package rules import ( "context" "encoding/json" "fmt" "log/slog" "regexp" "strings" "github.com/jackc/pgx/v5/pgxpool" "github.com/ultisuite/ulti-backend/internal/mail/webhooks" ) type WebhookExecutor interface { Execute(ctx context.Context, templateID string, msgCtx *webhooks.MessageContext) error } type Engine struct { db *pgxpool.Pool logger *slog.Logger webhookExec WebhookExecutor } func NewEngine(db *pgxpool.Pool) *Engine { return &Engine{ db: db, logger: slog.Default().With("component", "rules-engine"), } } func NewEngineWithWebhooks(db *pgxpool.Pool, webhookExec WebhookExecutor) *Engine { e := NewEngine(db) e.webhookExec = webhookExec return e } func (e *Engine) SetWebhookExecutor(webhookExec WebhookExecutor) { e.webhookExec = webhookExec } type Rule struct { ID string `json:"id"` Name string `json:"name"` Priority int `json:"priority"` Conditions []Condition `json:"conditions"` Actions []Action `json:"actions"` } type Condition struct { Field string `json:"field"` // from, to, subject, body, has_attachment Operator string `json:"operator"` // contains, equals, starts_with, ends_with, regex, has, not_has, ... Value string `json:"value"` } type Action struct { Type string `json:"type"` // label, move, archive, delete, mark_read, forward, webhook Value string `json:"value"` // label name, folder name, email, webhook_id } type ActionResult struct { Type string `json:"type"` Value string `json:"value"` OK bool `json:"ok"` Error string `json:"error"` } type Message struct { ID string `json:"id"` AccountID string `json:"account_id,omitempty"` FolderID string `json:"folder_id,omitempty"` From string `json:"from"` To []string `json:"to"` Subject string `json:"subject"` BodyText string `json:"body_text"` HasAttachments bool `json:"has_attachments"` Labels []string `json:"labels,omitempty"` } func (e *Engine) Evaluate(ctx context.Context, userID string, msg *Message) error { return e.EvaluateMessage(ctx, userID, msg) } func (e *Engine) EvaluateMessage(ctx context.Context, userID string, msg *Message) error { return e.EvaluateMessageEvent(ctx, userID, msg, &EventContext{Type: TriggerMessageReceived}) } func (e *Engine) EvaluateMessageEvent(ctx context.Context, userID string, msg *Message, evt *EventContext) error { rows, err := e.db.Query(ctx, ` SELECT id, name, conditions, actions, workflow FROM mail_rules WHERE user_id = $1 AND is_active = true AND rule_kind = 'rule' ORDER BY priority ASC `, userID) if err != nil { return fmt.Errorf("query rules: %w", err) } defer rows.Close() for rows.Next() { var ( ruleID string name string condJSON []byte actJSON []byte wfJSON []byte ) if err := rows.Scan(&ruleID, &name, &condJSON, &actJSON, &wfJSON); err != nil { e.logger.Error("scan rule", "error", err) continue } wf, err := ParseWorkflow(wfJSON) if err != nil { e.logger.Error("parse workflow", "rule_id", ruleID, "error", err) continue } var results []ActionResult if wf != nil && len(wf.Nodes) > 0 { if !matchesTriggers(wf.Triggers, msg, evt) { continue } startID := wf.findStartNode() if startID == "" { e.logger.Error("workflow missing start", "rule_id", ruleID) continue } execCtx := newExecContext(msg, userID, wf.Variables) if err := e.walkWorkflow(ctx, userID, msg, wf, startID, execCtx, 0); err != nil { e.logger.Error("execute workflow", "rule_id", ruleID, "error", err) } results = execCtx.Results } else { var conditions []Condition var actions []Action json.Unmarshal(condJSON, &conditions) json.Unmarshal(actJSON, &actions) if !matchesAll(conditions, msg) { continue } results = e.executeRuleActions(ctx, ruleID, actions, msg) } e.logger.Info("rule matched", "rule_id", ruleID, "rule_name", name, "message_id", msg.ID) if err := e.recordRuleExecution(ctx, ruleID, msg.ID, results); err != nil { e.logger.Error("record rule execution", "rule_id", ruleID, "message_id", msg.ID, "error", err) } e.db.Exec(ctx, `UPDATE mail_rules SET match_count = match_count + 1 WHERE id = $1`, ruleID) } return nil } func (e *Engine) executeRuleActions(ctx context.Context, ruleID string, actions []Action, msg *Message) []ActionResult { results := make([]ActionResult, 0, len(actions)) for _, action := range actions { err := e.executeAction(ctx, action, msg) results = append(results, actionResultFrom(action, err)) if err != nil { e.logger.Error("action failed", "rule_id", ruleID, "action", action.Type, "error", err) } } return results } func actionResultFrom(action Action, err error) ActionResult { result := ActionResult{ Type: action.Type, Value: action.Value, OK: err == nil, } if err != nil { result.Error = err.Error() } return result } func (e *Engine) recordRuleExecution(ctx context.Context, ruleID, messageID string, results []ActionResult) error { actionsJSON, err := json.Marshal(results) if err != nil { return fmt.Errorf("marshal actions_applied: %w", err) } execError := aggregateActionErrors(results) _, err = e.db.Exec(ctx, ` INSERT INTO rule_executions (rule_id, message_id, actions_applied, error) VALUES ($1, $2, $3, $4) `, ruleID, messageID, actionsJSON, execError) if err != nil { return fmt.Errorf("insert rule_executions: %w", err) } return nil } func aggregateActionErrors(results []ActionResult) string { var parts []string for _, r := range results { if !r.OK && r.Error != "" { parts = append(parts, fmt.Sprintf("%s: %s", r.Type, r.Error)) } } return strings.Join(parts, "; ") } func matchesAll(conditions []Condition, msg *Message) bool { for _, cond := range conditions { if !matchCondition(cond, msg) { return false } } return true } func matchCondition(cond Condition, msg *Message) bool { if cond.Field == "label" { has := messageHasLabel(msg, cond.Value) switch cond.Operator { case "has": return has case "not_has": return !has default: return false } } var fieldValue string switch cond.Field { case "from": fieldValue = msg.From case "to": fieldValue = strings.Join(msg.To, ", ") case "subject": fieldValue = msg.Subject case "body": fieldValue = msg.BodyText case "has_attachment": if msg.HasAttachments { fieldValue = "true" } else { fieldValue = "false" } default: return false } switch cond.Operator { case "regex": re, err := regexp.Compile(cond.Value) if err != nil { return false } return re.MatchString(fieldValue) case "not_regex": re, err := regexp.Compile(cond.Value) if err != nil { return false } return !re.MatchString(fieldValue) } fieldLower := strings.ToLower(fieldValue) valueLower := strings.ToLower(cond.Value) switch cond.Operator { case "contains": return strings.Contains(fieldLower, valueLower) case "equals": return fieldLower == valueLower case "starts_with": return strings.HasPrefix(fieldLower, valueLower) case "ends_with": return strings.HasSuffix(fieldLower, valueLower) case "not_contains": return !strings.Contains(fieldLower, valueLower) default: return false } } func messageHasLabel(msg *Message, label string) bool { labelLower := strings.ToLower(strings.TrimSpace(label)) if labelLower == "" { return false } for _, l := range msg.Labels { if strings.ToLower(l) == labelLower { return true } } return false } func messageToWebhookContext(msg *Message) *webhooks.MessageContext { senderName, senderEmail := parseFromAddress(msg.From) return &webhooks.MessageContext{ SenderName: senderName, SenderEmail: senderEmail, Subject: msg.Subject, BodyText: msg.BodyText, Recipients: strings.Join(msg.To, ", "), HasAttachment: msg.HasAttachments, MessageID: msg.ID, } } func parseFromAddress(from string) (name, email string) { from = strings.TrimSpace(from) if from == "" { return "", "" } if i := strings.LastIndex(from, "<"); i >= 0 { j := strings.LastIndex(from, ">") if j > i { email = strings.TrimSpace(from[i+1 : j]) name = strings.TrimSpace(from[:i]) name = strings.Trim(name, `"`) return name, email } } return "", from } func (e *Engine) executeAction(ctx context.Context, action Action, msg *Message) error { switch action.Type { case "label": _, err := e.db.Exec(ctx, ` UPDATE messages SET labels = array_append(labels, $1), updated_at = NOW() WHERE id = $2 AND NOT ($1 = ANY(labels)) `, action.Value, msg.ID) return err case "move": _, err := e.db.Exec(ctx, ` UPDATE messages SET folder_id = ( SELECT id FROM mail_folders WHERE account_id = ( SELECT account_id FROM messages WHERE id = $2 ) AND name = $1 LIMIT 1 ), updated_at = NOW() WHERE id = $2 `, action.Value, msg.ID) return err case "archive": _, err := e.db.Exec(ctx, ` UPDATE messages SET flags = array_append(flags, '\Archive'), updated_at = NOW() WHERE id = $1 `, msg.ID) return err case "mark_read": _, err := e.db.Exec(ctx, ` UPDATE messages SET flags = array_append(flags, '\Seen'), updated_at = NOW() WHERE id = $1 AND NOT ('\Seen' = ANY(flags)) `, msg.ID) return err case "delete": _, err := e.db.Exec(ctx, ` UPDATE messages SET flags = array_append(flags, '\Deleted'), updated_at = NOW() WHERE id = $1 `, msg.ID) return err case "remove_label": _, err := e.db.Exec(ctx, ` UPDATE messages SET labels = array_remove(labels, $1), updated_at = NOW() WHERE id = $2 `, action.Value, msg.ID) return err case "mark_important": _, err := e.db.Exec(ctx, ` UPDATE messages SET flags = array_append(flags, '\Flagged'), updated_at = NOW() WHERE id = $1 AND NOT ('\Flagged' = ANY(flags)) `, msg.ID) return err case "mark_spam": _, err := e.db.Exec(ctx, ` UPDATE messages SET labels = ( SELECT array_agg(DISTINCT x) FROM unnest(array_append(labels, 'SPAM')) AS x ), updated_at = NOW() WHERE id = $1 `, msg.ID) return err case "star": _, err := e.db.Exec(ctx, ` UPDATE messages SET flags = array_append(flags, '\Flagged'), updated_at = NOW() WHERE id = $1 AND NOT ('\Flagged' = ANY(flags)) `, msg.ID) return err case "notify": e.logger.Info("notification action", "message_id", msg.ID, "body", action.Value) return nil case "reply", "send_mail", "forward": e.logger.Info("deferred mail action", "type", action.Type, "message_id", msg.ID, "value", action.Value) return nil case "webhook": if e.webhookExec == nil { return fmt.Errorf("webhook executor not configured") } return e.webhookExec.Execute(ctx, action.Value, messageToWebhookContext(msg)) default: return fmt.Errorf("unknown action type: %s", action.Type) } }