package rules import ( "context" "encoding/json" "fmt" "log/slog" "regexp" "strings" "github.com/jackc/pgx/v5/pgxpool" "github.com/ultisuite/ulti-backend/internal/mail/webhooks" ) type WebhookExecutor interface { Execute(ctx context.Context, templateID string, msgCtx *webhooks.MessageContext) error } type Engine struct { db *pgxpool.Pool logger *slog.Logger webhookExec WebhookExecutor } func NewEngine(db *pgxpool.Pool) *Engine { return &Engine{ db: db, logger: slog.Default().With("component", "rules-engine"), } } func NewEngineWithWebhooks(db *pgxpool.Pool, webhookExec WebhookExecutor) *Engine { e := NewEngine(db) e.webhookExec = webhookExec return e } func (e *Engine) SetWebhookExecutor(webhookExec WebhookExecutor) { e.webhookExec = webhookExec } type Rule struct { ID string `json:"id"` Name string `json:"name"` Priority int `json:"priority"` Conditions []Condition `json:"conditions"` Actions []Action `json:"actions"` } type Condition struct { Field string `json:"field"` // from, to, subject, body, has_attachment Operator string `json:"operator"` // contains, equals, starts_with, ends_with, regex, has, not_has, ... Value string `json:"value"` } type Action struct { Type string `json:"type"` // label, move, archive, delete, mark_read, forward, webhook Value string `json:"value"` // label name, folder name, email, webhook_id } type ActionResult struct { Type string `json:"type"` Value string `json:"value"` OK bool `json:"ok"` Error string `json:"error"` } type Message struct { ID string `json:"id"` AccountID string `json:"account_id,omitempty"` FolderID string `json:"folder_id,omitempty"` From string `json:"from"` To []string `json:"to"` Subject string `json:"subject"` BodyText string `json:"body_text"` HasAttachments bool `json:"has_attachments"` Labels []string `json:"labels,omitempty"` } func (e *Engine) Evaluate(ctx context.Context, userID string, msg *Message) error { return e.EvaluateMessage(ctx, userID, msg) } func (e *Engine) EvaluateMessage(ctx context.Context, userID string, msg *Message) error { return e.EvaluateMessageEvent(ctx, userID, msg, &EventContext{Type: TriggerMessageReceived}) } func (e *Engine) EvaluateMessageEvent(ctx context.Context, userID string, msg *Message, evt *EventContext) error { rows, err := e.db.Query(ctx, ` SELECT id, name, conditions, actions, workflow FROM mail_rules WHERE user_id = $1 AND is_active = true AND rule_kind = 'rule' ORDER BY priority ASC `, userID) if err != nil { return fmt.Errorf("query rules: %w", err) } defer rows.Close() for rows.Next() { var ( ruleID string name string condJSON []byte actJSON []byte wfJSON []byte ) if err := rows.Scan(&ruleID, &name, &condJSON, &actJSON, &wfJSON); err != nil { e.logger.Error("scan rule", "error", err) continue } wf, err := ParseWorkflow(wfJSON) if err != nil { e.logger.Error("parse workflow", "rule_id", ruleID, "error", err) continue } var results []ActionResult if wf != nil && len(wf.Nodes) > 0 { if !matchesTriggers(wf.Triggers, msg, evt) { continue } startID := wf.findStartNode() if startID == "" { e.logger.Error("workflow missing start", "rule_id", ruleID) continue } execCtx := newExecContext(msg, userID, wf.Variables, evt) if err := e.walkWorkflow(ctx, userID, msg, wf, startID, execCtx, 0); err != nil { e.logger.Error("execute workflow", "rule_id", ruleID, "error", err) } results = execCtx.Results } else { var conditions []Condition var actions []Action json.Unmarshal(condJSON, &conditions) json.Unmarshal(actJSON, &actions) if !matchesAllEvent(conditions, msg, evt) { continue } results = e.executeRuleActions(ctx, ruleID, actions, msg, evt) } e.logger.Info("rule matched", "rule_id", ruleID, "rule_name", name, "message_id", msg.ID) if err := e.recordRuleExecution(ctx, ruleID, msg.ID, results); err != nil { e.logger.Error("record rule execution", "rule_id", ruleID, "message_id", msg.ID, "error", err) } e.db.Exec(ctx, `UPDATE mail_rules SET match_count = match_count + 1 WHERE id = $1`, ruleID) } return nil } func (e *Engine) executeRuleActions(ctx context.Context, ruleID string, actions []Action, msg *Message, evt *EventContext) []ActionResult { results := make([]ActionResult, 0, len(actions)) for _, action := range actions { err := e.executeAction(ctx, action, msg, evt) results = append(results, actionResultFrom(action, err)) if err != nil { e.logger.Error("action failed", "rule_id", ruleID, "action", action.Type, "error", err) } } return results } func actionResultFrom(action Action, err error) ActionResult { result := ActionResult{ Type: action.Type, Value: action.Value, OK: err == nil, } if err != nil { result.Error = err.Error() } return result } func (e *Engine) recordRuleExecution(ctx context.Context, ruleID, messageID string, results []ActionResult) error { actionsJSON, err := json.Marshal(results) if err != nil { return fmt.Errorf("marshal actions_applied: %w", err) } execError := aggregateActionErrors(results) _, err = e.db.Exec(ctx, ` INSERT INTO rule_executions (rule_id, message_id, actions_applied, error) VALUES ($1, $2, $3, $4) `, ruleID, messageID, actionsJSON, execError) if err != nil { return fmt.Errorf("insert rule_executions: %w", err) } return nil } func aggregateActionErrors(results []ActionResult) string { var parts []string for _, r := range results { if !r.OK && r.Error != "" { parts = append(parts, fmt.Sprintf("%s: %s", r.Type, r.Error)) } } return strings.Join(parts, "; ") } func matchesAll(conditions []Condition, msg *Message) bool { return matchesAllEvent(conditions, msg, nil) } func matchesAllEvent(conditions []Condition, msg *Message, evt *EventContext) bool { for _, cond := range conditions { if !matchCondition(cond, msg, evt) { return false } } return true } func matchCondition(cond Condition, msg *Message, evt *EventContext) bool { if cond.Field == "label" || cond.Field == "contact_label" { var labels []string if cond.Field == "label" { labels = msg.Labels } else if evt != nil { if evt.ContactLabel != "" { labels = strings.Split(evt.ContactLabel, ", ") } } has := labelListHas(labels, cond.Value) switch cond.Operator { case "has": return has case "not_has": return !has default: return false } } var fieldValue string switch cond.Field { case "from": fieldValue = msg.From case "to": fieldValue = strings.Join(msg.To, ", ") case "subject": fieldValue = msg.Subject case "body": fieldValue = msg.BodyText case "has_attachment": if msg.HasAttachments { fieldValue = "true" } else { fieldValue = "false" } case "drive_file_name": if evt != nil { fieldValue = evt.DriveFileName } case "drive_file_path": if evt != nil { fieldValue = evt.DriveFilePath } case "drive_mime_type": if evt != nil { fieldValue = evt.DriveMimeType } case "drive_file_size": if evt != nil { fieldValue = fmt.Sprintf("%d", evt.DriveFileSize) } case "drive_is_folder": if evt != nil { if evt.DriveIsFolder { fieldValue = "true" } else { fieldValue = "false" } } case "contact_name": if evt != nil { fieldValue = evt.ContactName } case "contact_email": if evt != nil { fieldValue = evt.ContactEmail } case "contact_phone": if evt != nil { fieldValue = evt.ContactPhone } case "contact_org": if evt != nil { fieldValue = evt.ContactOrg } default: return false } switch cond.Operator { case "regex": re, err := regexp.Compile(cond.Value) if err != nil { return false } return re.MatchString(fieldValue) case "not_regex": re, err := regexp.Compile(cond.Value) if err != nil { return false } return !re.MatchString(fieldValue) } fieldLower := strings.ToLower(fieldValue) valueLower := strings.ToLower(cond.Value) switch cond.Operator { case "contains": return strings.Contains(fieldLower, valueLower) case "equals": return fieldLower == valueLower case "starts_with": return strings.HasPrefix(fieldLower, valueLower) case "ends_with": return strings.HasSuffix(fieldLower, valueLower) case "not_contains": return !strings.Contains(fieldLower, valueLower) default: return false } } func messageHasLabel(msg *Message, label string) bool { return labelListHas(msg.Labels, label) } func labelListHas(labels []string, label string) bool { labelLower := strings.ToLower(strings.TrimSpace(label)) if labelLower == "" { return false } for _, l := range labels { if strings.ToLower(l) == labelLower { return true } } return false } func messageToWebhookContext(msg *Message, evt *EventContext) *webhooks.MessageContext { return WebhookContextFromEvent(evt, msg) } func (e *Engine) executeAction(ctx context.Context, action Action, msg *Message, evt *EventContext) error { switch action.Type { case "label": _, err := e.db.Exec(ctx, ` UPDATE messages SET labels = array_append(labels, $1), updated_at = NOW() WHERE id = $2 AND NOT ($1 = ANY(labels)) `, action.Value, msg.ID) return err case "move": _, err := e.db.Exec(ctx, ` UPDATE messages SET folder_id = ( SELECT id FROM mail_folders WHERE account_id = ( SELECT account_id FROM messages WHERE id = $2 ) AND name = $1 LIMIT 1 ), updated_at = NOW() WHERE id = $2 `, action.Value, msg.ID) return err case "archive": _, err := e.db.Exec(ctx, ` UPDATE messages SET flags = array_append(flags, '\Archive'), updated_at = NOW() WHERE id = $1 `, msg.ID) return err case "mark_read": _, err := e.db.Exec(ctx, ` UPDATE messages SET flags = array_append(flags, '\Seen'), updated_at = NOW() WHERE id = $1 AND NOT ('\Seen' = ANY(flags)) `, msg.ID) return err case "delete": _, err := e.db.Exec(ctx, ` UPDATE messages SET flags = array_append(flags, '\Deleted'), updated_at = NOW() WHERE id = $1 `, msg.ID) return err case "remove_label": _, err := e.db.Exec(ctx, ` UPDATE messages SET labels = array_remove(labels, $1), updated_at = NOW() WHERE id = $2 `, action.Value, msg.ID) return err case "mark_important": _, err := e.db.Exec(ctx, ` UPDATE messages SET flags = array_append(flags, '\Flagged'), updated_at = NOW() WHERE id = $1 AND NOT ('\Flagged' = ANY(flags)) `, msg.ID) return err case "mark_spam": _, err := e.db.Exec(ctx, ` UPDATE messages SET labels = ( SELECT array_agg(DISTINCT x) FROM unnest(array_append(labels, 'SPAM')) AS x ), updated_at = NOW() WHERE id = $1 `, msg.ID) return err case "star": _, err := e.db.Exec(ctx, ` UPDATE messages SET flags = array_append(flags, '\Flagged'), updated_at = NOW() WHERE id = $1 AND NOT ('\Flagged' = ANY(flags)) `, msg.ID) return err case "notify": e.logger.Info("notification action", "message_id", msg.ID, "body", action.Value) return nil case "reply", "send_mail", "forward": e.logger.Info("deferred mail action", "type", action.Type, "message_id", msg.ID, "value", action.Value) return nil case "webhook": if e.webhookExec == nil { return fmt.Errorf("webhook executor not configured") } return e.webhookExec.Execute(ctx, action.Value, messageToWebhookContext(msg, evt)) case "drive_move", "drive_rename", "drive_delete", "drive_share", "drive_copy": e.logger.Info("deferred drive action", "type", action.Type, "value", action.Value) return nil case "contact_add_label", "contact_remove_label", "contact_delete": e.logger.Info("deferred contact action", "type", action.Type, "value", action.Value) return nil default: return fmt.Errorf("unknown action type: %s", action.Type) } }