Wire automation dispatcher to IMAP sync, drive mutations, and contact CRUD. Add webhook event_types and mail/drive/contacts scope filters (migration 30).
440 lines
12 KiB
Go
440 lines
12 KiB
Go
package rules
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log/slog"
|
|
"regexp"
|
|
"strings"
|
|
|
|
"github.com/jackc/pgx/v5/pgxpool"
|
|
"github.com/ultisuite/ulti-backend/internal/mail/webhooks"
|
|
)
|
|
|
|
type WebhookExecutor interface {
|
|
Execute(ctx context.Context, templateID string, msgCtx *webhooks.MessageContext) error
|
|
}
|
|
|
|
type Engine struct {
|
|
db *pgxpool.Pool
|
|
logger *slog.Logger
|
|
webhookExec WebhookExecutor
|
|
}
|
|
|
|
func NewEngine(db *pgxpool.Pool) *Engine {
|
|
return &Engine{
|
|
db: db,
|
|
logger: slog.Default().With("component", "rules-engine"),
|
|
}
|
|
}
|
|
|
|
func NewEngineWithWebhooks(db *pgxpool.Pool, webhookExec WebhookExecutor) *Engine {
|
|
e := NewEngine(db)
|
|
e.webhookExec = webhookExec
|
|
return e
|
|
}
|
|
|
|
func (e *Engine) SetWebhookExecutor(webhookExec WebhookExecutor) {
|
|
e.webhookExec = webhookExec
|
|
}
|
|
|
|
type Rule struct {
|
|
ID string `json:"id"`
|
|
Name string `json:"name"`
|
|
Priority int `json:"priority"`
|
|
Conditions []Condition `json:"conditions"`
|
|
Actions []Action `json:"actions"`
|
|
}
|
|
|
|
type Condition struct {
|
|
Field string `json:"field"` // from, to, subject, body, has_attachment
|
|
Operator string `json:"operator"` // contains, equals, starts_with, ends_with, regex, has, not_has, ...
|
|
Value string `json:"value"`
|
|
}
|
|
|
|
type Action struct {
|
|
Type string `json:"type"` // label, move, archive, delete, mark_read, forward, webhook
|
|
Value string `json:"value"` // label name, folder name, email, webhook_id
|
|
}
|
|
|
|
type ActionResult struct {
|
|
Type string `json:"type"`
|
|
Value string `json:"value"`
|
|
OK bool `json:"ok"`
|
|
Error string `json:"error"`
|
|
}
|
|
|
|
type Message struct {
|
|
ID string `json:"id"`
|
|
AccountID string `json:"account_id,omitempty"`
|
|
FolderID string `json:"folder_id,omitempty"`
|
|
From string `json:"from"`
|
|
To []string `json:"to"`
|
|
Subject string `json:"subject"`
|
|
BodyText string `json:"body_text"`
|
|
HasAttachments bool `json:"has_attachments"`
|
|
Labels []string `json:"labels,omitempty"`
|
|
}
|
|
|
|
func (e *Engine) Evaluate(ctx context.Context, userID string, msg *Message) error {
|
|
return e.EvaluateMessage(ctx, userID, msg)
|
|
}
|
|
|
|
func (e *Engine) EvaluateMessage(ctx context.Context, userID string, msg *Message) error {
|
|
return e.EvaluateMessageEvent(ctx, userID, msg, &EventContext{Type: TriggerMessageReceived})
|
|
}
|
|
|
|
func (e *Engine) EvaluateMessageEvent(ctx context.Context, userID string, msg *Message, evt *EventContext) error {
|
|
rows, err := e.db.Query(ctx, `
|
|
SELECT id, name, conditions, actions, workflow
|
|
FROM mail_rules
|
|
WHERE user_id = $1 AND is_active = true AND rule_kind = 'rule'
|
|
ORDER BY priority ASC
|
|
`, userID)
|
|
if err != nil {
|
|
return fmt.Errorf("query rules: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
for rows.Next() {
|
|
var (
|
|
ruleID string
|
|
name string
|
|
condJSON []byte
|
|
actJSON []byte
|
|
wfJSON []byte
|
|
)
|
|
if err := rows.Scan(&ruleID, &name, &condJSON, &actJSON, &wfJSON); err != nil {
|
|
e.logger.Error("scan rule", "error", err)
|
|
continue
|
|
}
|
|
|
|
wf, err := ParseWorkflow(wfJSON)
|
|
if err != nil {
|
|
e.logger.Error("parse workflow", "rule_id", ruleID, "error", err)
|
|
continue
|
|
}
|
|
|
|
var results []ActionResult
|
|
|
|
if wf != nil && len(wf.Nodes) > 0 {
|
|
if !matchesTriggers(wf.Triggers, msg, evt) {
|
|
continue
|
|
}
|
|
startID := wf.findStartNode()
|
|
if startID == "" {
|
|
e.logger.Error("workflow missing start", "rule_id", ruleID)
|
|
continue
|
|
}
|
|
execCtx := newExecContext(msg, userID, wf.Variables, evt)
|
|
if err := e.walkWorkflow(ctx, userID, msg, wf, startID, execCtx, 0); err != nil {
|
|
e.logger.Error("execute workflow", "rule_id", ruleID, "error", err)
|
|
}
|
|
results = execCtx.Results
|
|
} else {
|
|
var conditions []Condition
|
|
var actions []Action
|
|
json.Unmarshal(condJSON, &conditions)
|
|
json.Unmarshal(actJSON, &actions)
|
|
if !matchesAllEvent(conditions, msg, evt) {
|
|
continue
|
|
}
|
|
results = e.executeRuleActions(ctx, ruleID, actions, msg, evt)
|
|
}
|
|
|
|
e.logger.Info("rule matched", "rule_id", ruleID, "rule_name", name, "message_id", msg.ID)
|
|
if err := e.recordRuleExecution(ctx, ruleID, msg.ID, results); err != nil {
|
|
e.logger.Error("record rule execution", "rule_id", ruleID, "message_id", msg.ID, "error", err)
|
|
}
|
|
e.db.Exec(ctx, `UPDATE mail_rules SET match_count = match_count + 1 WHERE id = $1`, ruleID)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (e *Engine) executeRuleActions(ctx context.Context, ruleID string, actions []Action, msg *Message, evt *EventContext) []ActionResult {
|
|
results := make([]ActionResult, 0, len(actions))
|
|
for _, action := range actions {
|
|
err := e.executeAction(ctx, action, msg, evt)
|
|
results = append(results, actionResultFrom(action, err))
|
|
if err != nil {
|
|
e.logger.Error("action failed", "rule_id", ruleID, "action", action.Type, "error", err)
|
|
}
|
|
}
|
|
return results
|
|
}
|
|
|
|
func actionResultFrom(action Action, err error) ActionResult {
|
|
result := ActionResult{
|
|
Type: action.Type,
|
|
Value: action.Value,
|
|
OK: err == nil,
|
|
}
|
|
if err != nil {
|
|
result.Error = err.Error()
|
|
}
|
|
return result
|
|
}
|
|
|
|
func (e *Engine) recordRuleExecution(ctx context.Context, ruleID, messageID string, results []ActionResult) error {
|
|
actionsJSON, err := json.Marshal(results)
|
|
if err != nil {
|
|
return fmt.Errorf("marshal actions_applied: %w", err)
|
|
}
|
|
|
|
execError := aggregateActionErrors(results)
|
|
|
|
_, err = e.db.Exec(ctx, `
|
|
INSERT INTO rule_executions (rule_id, message_id, actions_applied, error)
|
|
VALUES ($1, $2, $3, $4)
|
|
`, ruleID, messageID, actionsJSON, execError)
|
|
if err != nil {
|
|
return fmt.Errorf("insert rule_executions: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func aggregateActionErrors(results []ActionResult) string {
|
|
var parts []string
|
|
for _, r := range results {
|
|
if !r.OK && r.Error != "" {
|
|
parts = append(parts, fmt.Sprintf("%s: %s", r.Type, r.Error))
|
|
}
|
|
}
|
|
return strings.Join(parts, "; ")
|
|
}
|
|
|
|
func matchesAll(conditions []Condition, msg *Message) bool {
|
|
return matchesAllEvent(conditions, msg, nil)
|
|
}
|
|
|
|
func matchesAllEvent(conditions []Condition, msg *Message, evt *EventContext) bool {
|
|
for _, cond := range conditions {
|
|
if !matchCondition(cond, msg, evt) {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
func matchCondition(cond Condition, msg *Message, evt *EventContext) bool {
|
|
if cond.Field == "label" || cond.Field == "contact_label" {
|
|
var labels []string
|
|
if cond.Field == "label" {
|
|
labels = msg.Labels
|
|
} else if evt != nil {
|
|
if evt.ContactLabel != "" {
|
|
labels = strings.Split(evt.ContactLabel, ", ")
|
|
}
|
|
}
|
|
has := labelListHas(labels, cond.Value)
|
|
switch cond.Operator {
|
|
case "has":
|
|
return has
|
|
case "not_has":
|
|
return !has
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
var fieldValue string
|
|
switch cond.Field {
|
|
case "from":
|
|
fieldValue = msg.From
|
|
case "to":
|
|
fieldValue = strings.Join(msg.To, ", ")
|
|
case "subject":
|
|
fieldValue = msg.Subject
|
|
case "body":
|
|
fieldValue = msg.BodyText
|
|
case "has_attachment":
|
|
if msg.HasAttachments {
|
|
fieldValue = "true"
|
|
} else {
|
|
fieldValue = "false"
|
|
}
|
|
case "drive_file_name":
|
|
if evt != nil {
|
|
fieldValue = evt.DriveFileName
|
|
}
|
|
case "drive_file_path":
|
|
if evt != nil {
|
|
fieldValue = evt.DriveFilePath
|
|
}
|
|
case "drive_mime_type":
|
|
if evt != nil {
|
|
fieldValue = evt.DriveMimeType
|
|
}
|
|
case "drive_file_size":
|
|
if evt != nil {
|
|
fieldValue = fmt.Sprintf("%d", evt.DriveFileSize)
|
|
}
|
|
case "drive_is_folder":
|
|
if evt != nil {
|
|
if evt.DriveIsFolder {
|
|
fieldValue = "true"
|
|
} else {
|
|
fieldValue = "false"
|
|
}
|
|
}
|
|
case "contact_name":
|
|
if evt != nil {
|
|
fieldValue = evt.ContactName
|
|
}
|
|
case "contact_email":
|
|
if evt != nil {
|
|
fieldValue = evt.ContactEmail
|
|
}
|
|
case "contact_phone":
|
|
if evt != nil {
|
|
fieldValue = evt.ContactPhone
|
|
}
|
|
case "contact_org":
|
|
if evt != nil {
|
|
fieldValue = evt.ContactOrg
|
|
}
|
|
default:
|
|
return false
|
|
}
|
|
|
|
switch cond.Operator {
|
|
case "regex":
|
|
re, err := regexp.Compile(cond.Value)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
return re.MatchString(fieldValue)
|
|
case "not_regex":
|
|
re, err := regexp.Compile(cond.Value)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
return !re.MatchString(fieldValue)
|
|
}
|
|
|
|
fieldLower := strings.ToLower(fieldValue)
|
|
valueLower := strings.ToLower(cond.Value)
|
|
|
|
switch cond.Operator {
|
|
case "contains":
|
|
return strings.Contains(fieldLower, valueLower)
|
|
case "equals":
|
|
return fieldLower == valueLower
|
|
case "starts_with":
|
|
return strings.HasPrefix(fieldLower, valueLower)
|
|
case "ends_with":
|
|
return strings.HasSuffix(fieldLower, valueLower)
|
|
case "not_contains":
|
|
return !strings.Contains(fieldLower, valueLower)
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
func messageHasLabel(msg *Message, label string) bool {
|
|
return labelListHas(msg.Labels, label)
|
|
}
|
|
|
|
func labelListHas(labels []string, label string) bool {
|
|
labelLower := strings.ToLower(strings.TrimSpace(label))
|
|
if labelLower == "" {
|
|
return false
|
|
}
|
|
for _, l := range labels {
|
|
if strings.ToLower(l) == labelLower {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func messageToWebhookContext(msg *Message, evt *EventContext) *webhooks.MessageContext {
|
|
return WebhookContextFromEvent(evt, msg)
|
|
}
|
|
|
|
func (e *Engine) executeAction(ctx context.Context, action Action, msg *Message, evt *EventContext) error {
|
|
switch action.Type {
|
|
case "label":
|
|
_, err := e.db.Exec(ctx, `
|
|
UPDATE messages SET labels = array_append(labels, $1), updated_at = NOW()
|
|
WHERE id = $2 AND NOT ($1 = ANY(labels))
|
|
`, action.Value, msg.ID)
|
|
return err
|
|
case "move":
|
|
_, err := e.db.Exec(ctx, `
|
|
UPDATE messages SET folder_id = (
|
|
SELECT id FROM mail_folders WHERE account_id = (
|
|
SELECT account_id FROM messages WHERE id = $2
|
|
) AND name = $1 LIMIT 1
|
|
), updated_at = NOW()
|
|
WHERE id = $2
|
|
`, action.Value, msg.ID)
|
|
return err
|
|
case "archive":
|
|
_, err := e.db.Exec(ctx, `
|
|
UPDATE messages SET flags = array_append(flags, '\Archive'), updated_at = NOW()
|
|
WHERE id = $1
|
|
`, msg.ID)
|
|
return err
|
|
case "mark_read":
|
|
_, err := e.db.Exec(ctx, `
|
|
UPDATE messages SET flags = array_append(flags, '\Seen'), updated_at = NOW()
|
|
WHERE id = $1 AND NOT ('\Seen' = ANY(flags))
|
|
`, msg.ID)
|
|
return err
|
|
case "delete":
|
|
_, err := e.db.Exec(ctx, `
|
|
UPDATE messages SET flags = array_append(flags, '\Deleted'), updated_at = NOW()
|
|
WHERE id = $1
|
|
`, msg.ID)
|
|
return err
|
|
case "remove_label":
|
|
_, err := e.db.Exec(ctx, `
|
|
UPDATE messages SET labels = array_remove(labels, $1), updated_at = NOW()
|
|
WHERE id = $2
|
|
`, action.Value, msg.ID)
|
|
return err
|
|
case "mark_important":
|
|
_, err := e.db.Exec(ctx, `
|
|
UPDATE messages SET flags = array_append(flags, '\Flagged'), updated_at = NOW()
|
|
WHERE id = $1 AND NOT ('\Flagged' = ANY(flags))
|
|
`, msg.ID)
|
|
return err
|
|
case "mark_spam":
|
|
_, err := e.db.Exec(ctx, `
|
|
UPDATE messages SET labels = (
|
|
SELECT array_agg(DISTINCT x) FROM unnest(array_append(labels, 'SPAM')) AS x
|
|
), updated_at = NOW()
|
|
WHERE id = $1
|
|
`, msg.ID)
|
|
return err
|
|
case "star":
|
|
_, err := e.db.Exec(ctx, `
|
|
UPDATE messages SET flags = array_append(flags, '\Flagged'), updated_at = NOW()
|
|
WHERE id = $1 AND NOT ('\Flagged' = ANY(flags))
|
|
`, msg.ID)
|
|
return err
|
|
case "notify":
|
|
e.logger.Info("notification action", "message_id", msg.ID, "body", action.Value)
|
|
return nil
|
|
case "reply", "send_mail", "forward":
|
|
e.logger.Info("deferred mail action", "type", action.Type, "message_id", msg.ID, "value", action.Value)
|
|
return nil
|
|
case "webhook":
|
|
if e.webhookExec == nil {
|
|
return fmt.Errorf("webhook executor not configured")
|
|
}
|
|
return e.webhookExec.Execute(ctx, action.Value, messageToWebhookContext(msg, evt))
|
|
case "drive_move", "drive_rename", "drive_delete", "drive_share", "drive_copy":
|
|
e.logger.Info("deferred drive action", "type", action.Type, "value", action.Value)
|
|
return nil
|
|
case "contact_add_label", "contact_remove_label", "contact_delete":
|
|
e.logger.Info("deferred contact action", "type", action.Type, "value", action.Value)
|
|
return nil
|
|
default:
|
|
return fmt.Errorf("unknown action type: %s", action.Type)
|
|
}
|
|
}
|