package rules import ( "context" "encoding/json" "fmt" "strings" ) type ExecContext struct { Variables map[string]string Message *Message UserID string Results []ActionResult } func newExecContext(msg *Message, userID string, vars []ExecVariable) *ExecContext { m := make(map[string]string, len(vars)) for _, v := range vars { m[v.Name] = v.Default } return &ExecContext{ Variables: m, Message: msg, UserID: userID, Results: make([]ActionResult, 0), } } func (e *Engine) ExecuteWorkflow(ctx context.Context, userID string, msg *Message, wf *Workflow, evt *EventContext) ([]ActionResult, error) { if wf == nil { return nil, nil } if wf.Kind == RuleKindFunction { return e.runWorkflowGraph(ctx, userID, msg, wf, newExecContext(msg, userID, wf.Variables)) } if !matchesTriggers(wf.Triggers, msg, evt) { return nil, nil } startID := wf.findStartNode() if startID == "" { return nil, fmt.Errorf("workflow missing start node") } execCtx := newExecContext(msg, userID, wf.Variables) if err := e.walkWorkflow(ctx, userID, msg, wf, startID, execCtx, 0); err != nil { return execCtx.Results, err } return execCtx.Results, nil } const maxWorkflowDepth = 32 func (e *Engine) walkWorkflow(ctx context.Context, userID string, msg *Message, wf *Workflow, nodeID string, execCtx *ExecContext, depth int) error { if depth > maxWorkflowDepth { return fmt.Errorf("workflow depth exceeded") } if nodeID == "" { return nil } nodes := wf.nodeMap() node, ok := nodes[nodeID] if !ok { return fmt.Errorf("unknown node: %s", nodeID) } switch node.Type { case "start": return e.walkWorkflow(ctx, userID, msg, wf, wf.nextDefault(nodeID), execCtx, depth+1) case "label_check": var data LabelCheckNodeData if err := json.Unmarshal(node.Data, &data); err != nil { return fmt.Errorf("label_check node data: %w", err) } cond := Condition{Field: "label", Operator: "has", Value: data.Label} if data.Operator == "not_has" { cond.Operator = "not_has" } handle := "false" if matchCondition(cond, msg) { handle = "true" } return e.walkWorkflow(ctx, userID, msg, wf, wf.nextNode(nodeID, handle), execCtx, depth+1) case "condition": var data ConditionNodeData if err := json.Unmarshal(node.Data, &data); err != nil { return fmt.Errorf("condition node data: %w", err) } cond := Condition{Field: data.Field, Operator: data.Operator, Value: interpolateValue(data.Value, execCtx)} handle := "false" if matchCondition(cond, msg) { handle = "true" } return e.walkWorkflow(ctx, userID, msg, wf, wf.nextNode(nodeID, handle), execCtx, depth+1) case "switch": var data SwitchNodeData if err := json.Unmarshal(node.Data, &data); err != nil { return fmt.Errorf("switch node data: %w", err) } fieldVal := workflowFieldValue(data.Field, msg, execCtx) handle := "default" for i, c := range data.Cases { if strings.EqualFold(fieldVal, c.Value) { handle = fmt.Sprintf("case-%d", i) break } } next := wf.nextNode(nodeID, handle) if next == "" { next = wf.nextNode(nodeID, "default") } return e.walkWorkflow(ctx, userID, msg, wf, next, execCtx, depth+1) case "llm_check": var data LLMCheckNodeData if err := json.Unmarshal(node.Data, &data); err != nil { return fmt.Errorf("llm_check node data: %w", err) } handle := "false" if e.evaluateLLMCheck(ctx, data, msg, execCtx) { handle = "true" } return e.walkWorkflow(ctx, userID, msg, wf, wf.nextNode(nodeID, handle), execCtx, depth+1) case "actions": var data ActionsNodeData if err := json.Unmarshal(node.Data, &data); err != nil { return fmt.Errorf("actions node data: %w", err) } for _, item := range data.Actions { action := Action{Type: item.Type, Value: interpolateValue(item.Value, execCtx)} err := e.executeAction(ctx, action, msg) result := actionResultFrom(action, err) execCtx.Results = append(execCtx.Results, result) if err != nil { e.logger.Error("workflow action failed", "action", action.Type, "error", err) } } return e.walkWorkflow(ctx, userID, msg, wf, wf.nextDefault(nodeID), execCtx, depth+1) case "set_var": var data SetVarNodeData if err := json.Unmarshal(node.Data, &data); err != nil { return fmt.Errorf("set_var node data: %w", err) } execCtx.Variables[data.Name] = interpolateValue(data.Value, execCtx) return e.walkWorkflow(ctx, userID, msg, wf, wf.nextDefault(nodeID), execCtx, depth+1) case "call_function", "call_rule": var data CallRuleNodeData if err := json.Unmarshal(node.Data, &data); err != nil { return fmt.Errorf("call_rule node data: %w", err) } if err := e.invokeSubWorkflow(ctx, userID, msg, data.RuleID, execCtx, depth+1); err != nil { return err } return e.walkWorkflow(ctx, userID, msg, wf, wf.nextDefault(nodeID), execCtx, depth+1) case "end": return nil default: return fmt.Errorf("unknown node type: %s", node.Type) } } func (e *Engine) invokeSubWorkflow(ctx context.Context, userID string, msg *Message, ruleID string, parent *ExecContext, depth int) error { if depth > maxWorkflowDepth { return fmt.Errorf("workflow call depth exceeded") } var ( wfJSON []byte ruleKind string isActive bool ) err := e.db.QueryRow(ctx, ` SELECT workflow, rule_kind, is_active FROM mail_rules WHERE id = $1 AND user_id = $2 `, ruleID, userID).Scan(&wfJSON, &ruleKind, &isActive) if err != nil { return fmt.Errorf("load sub-rule %s: %w", ruleID, err) } if !isActive { return nil } wf, err := ParseWorkflow(wfJSON) if err != nil { return err } if wf == nil { return fmt.Errorf("sub-rule %s has no workflow", ruleID) } childCtx := &ExecContext{ Variables: copyVars(parent.Variables), Message: msg, UserID: userID, Results: parent.Results, } startID := wf.findStartNode() if startID == "" { return fmt.Errorf("sub-rule %s missing start node", ruleID) } return e.walkWorkflow(ctx, userID, msg, wf, startID, childCtx, depth) } func copyVars(src map[string]string) map[string]string { dst := make(map[string]string, len(src)) for k, v := range src { dst[k] = v } return dst } func workflowFieldValue(field string, msg *Message, execCtx *ExecContext) string { if strings.HasPrefix(field, "$") { name := strings.TrimPrefix(field, "$") if v, ok := execCtx.Variables[name]; ok { return v } return "" } switch field { case "from": return msg.From case "to": return strings.Join(msg.To, ", ") case "subject": return msg.Subject case "body": return msg.BodyText case "has_attachment": if msg.HasAttachments { return "true" } return "false" case "label": return strings.Join(msg.Labels, ", ") default: return "" } } func interpolateValue(template string, execCtx *ExecContext) string { if !strings.Contains(template, "{{") { return template } out := template for name, val := range execCtx.Variables { out = strings.ReplaceAll(out, "{{"+name+"}}", val) } if strings.Contains(out, "{{") && execCtx.Message != nil { out = strings.ReplaceAll(out, "{{subject}}", execCtx.Message.Subject) out = strings.ReplaceAll(out, "{{from}}", execCtx.Message.From) } return out } func (e *Engine) evaluateLLMCheck(ctx context.Context, data LLMCheckNodeData, msg *Message, execCtx *ExecContext) bool { _ = ctx prompt := interpolateValue(data.Prompt, execCtx) promptLower := strings.ToLower(prompt) if strings.Contains(promptLower, "spam") { subjectLower := strings.ToLower(msg.Subject) bodyLower := strings.ToLower(msg.BodyText) return strings.Contains(subjectLower, "spam") || strings.Contains(bodyLower, "spam") || strings.Contains(subjectLower, "viagra") || strings.Contains(bodyLower, "lottery") } if strings.Contains(promptLower, "important") || strings.Contains(promptLower, "urgent") { subjectLower := strings.ToLower(msg.Subject) return strings.Contains(subjectLower, "urgent") || strings.Contains(subjectLower, "important") || strings.Contains(subjectLower, "asap") } return false } func (e *Engine) runWorkflowGraph(ctx context.Context, userID string, msg *Message, wf *Workflow, execCtx *ExecContext) ([]ActionResult, error) { startID := wf.findStartNode() if startID == "" { return nil, fmt.Errorf("function workflow missing start node") } if err := e.walkWorkflow(ctx, userID, msg, wf, startID, execCtx, 0); err != nil { return execCtx.Results, err } return execCtx.Results, nil }