ultisuite-backend/internal/mail/rules/engine.go
R3D347HR4Y 1d063237b9
Some checks are pending
CI / Go tests (push) Waiting to run
CI / Integration tests (push) Waiting to run
CI / DB migrations (push) Waiting to run
feat(transcription): integrate Faster Whisper for Jitsi transcriptions
- Added support for Faster Whisper transcription via Jigasi and Skynet.
- Updated .env.example to include new environment variables for transcription settings.
- Enhanced Jitsi Docker Compose configuration to include Skynet and Jigasi services.
- Introduced new API endpoints for managing organizational folders in the drive service.
- Updated Nextcloud initialization script to enable external file mounting.
- Improved error handling and response structures in the drive API.
- Added new properties for organization settings related to transcription and agenda management.
2026-06-12 19:10:18 +02:00

475 lines
12 KiB
Go

package rules
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"regexp"
"strings"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/ultisuite/ulti-backend/internal/mail/webhooks"
)
type WebhookExecutor interface {
Execute(ctx context.Context, templateID string, msgCtx *webhooks.MessageContext) error
}
type Engine struct {
db *pgxpool.Pool
logger *slog.Logger
webhookExec WebhookExecutor
}
func NewEngine(db *pgxpool.Pool) *Engine {
return &Engine{
db: db,
logger: slog.Default().With("component", "rules-engine"),
}
}
func NewEngineWithWebhooks(db *pgxpool.Pool, webhookExec WebhookExecutor) *Engine {
e := NewEngine(db)
e.webhookExec = webhookExec
return e
}
func (e *Engine) SetWebhookExecutor(webhookExec WebhookExecutor) {
e.webhookExec = webhookExec
}
type Rule struct {
ID string `json:"id"`
Name string `json:"name"`
Priority int `json:"priority"`
Conditions []Condition `json:"conditions"`
Actions []Action `json:"actions"`
}
type Condition struct {
Field string `json:"field"` // from, to, subject, body, has_attachment
Operator string `json:"operator"` // contains, equals, starts_with, ends_with, regex, has, not_has, ...
Value string `json:"value"`
}
type Action struct {
Type string `json:"type"` // label, move, archive, delete, mark_read, forward, webhook
Value string `json:"value"` // label name, folder name, email, webhook_id
}
type ActionResult struct {
Type string `json:"type"`
Value string `json:"value"`
OK bool `json:"ok"`
Error string `json:"error"`
}
type Message struct {
ID string `json:"id"`
AccountID string `json:"account_id,omitempty"`
FolderID string `json:"folder_id,omitempty"`
From string `json:"from"`
To []string `json:"to"`
Subject string `json:"subject"`
BodyText string `json:"body_text"`
HasAttachments bool `json:"has_attachments"`
Labels []string `json:"labels,omitempty"`
}
func (e *Engine) Evaluate(ctx context.Context, userID string, msg *Message) error {
return e.EvaluateMessage(ctx, userID, msg)
}
func (e *Engine) EvaluateMessage(ctx context.Context, userID string, msg *Message) error {
return e.EvaluateMessageEvent(ctx, userID, msg, &EventContext{Type: TriggerMessageReceived})
}
func (e *Engine) EvaluateMessageEvent(ctx context.Context, userID string, msg *Message, evt *EventContext) error {
rows, err := e.db.Query(ctx, `
SELECT id, name, conditions, actions, workflow
FROM mail_rules
WHERE user_id = $1 AND is_active = true AND rule_kind = 'rule'
ORDER BY priority ASC
`, userID)
if err != nil {
return fmt.Errorf("query rules: %w", err)
}
defer rows.Close()
for rows.Next() {
var (
ruleID string
name string
condJSON []byte
actJSON []byte
wfJSON []byte
)
if err := rows.Scan(&ruleID, &name, &condJSON, &actJSON, &wfJSON); err != nil {
e.logger.Error("scan rule", "error", err)
continue
}
wf, err := ParseWorkflow(wfJSON)
if err != nil {
e.logger.Error("parse workflow", "rule_id", ruleID, "error", err)
continue
}
var results []ActionResult
if wf != nil && len(wf.Nodes) > 0 {
if !matchesTriggers(wf.Triggers, msg, evt) {
continue
}
startID := wf.findStartNode()
if startID == "" {
e.logger.Error("workflow missing start", "rule_id", ruleID)
continue
}
execCtx := newExecContext(msg, userID, wf.Variables, evt)
if err := e.walkWorkflow(ctx, userID, msg, wf, startID, execCtx, 0); err != nil {
e.logger.Error("execute workflow", "rule_id", ruleID, "error", err)
}
results = execCtx.Results
} else {
var conditions []Condition
var actions []Action
json.Unmarshal(condJSON, &conditions)
json.Unmarshal(actJSON, &actions)
if !matchesAllEvent(conditions, msg, evt) {
continue
}
results = e.executeRuleActions(ctx, ruleID, actions, msg, evt)
}
e.logger.Info("rule matched", "rule_id", ruleID, "rule_name", name, "message_id", msg.ID)
if err := e.recordRuleExecution(ctx, ruleID, msg.ID, results); err != nil {
e.logger.Error("record rule execution", "rule_id", ruleID, "message_id", msg.ID, "error", err)
}
e.db.Exec(ctx, `UPDATE mail_rules SET match_count = match_count + 1 WHERE id = $1`, ruleID)
}
return nil
}
func (e *Engine) executeRuleActions(ctx context.Context, ruleID string, actions []Action, msg *Message, evt *EventContext) []ActionResult {
results := make([]ActionResult, 0, len(actions))
for _, action := range actions {
err := e.executeAction(ctx, action, msg, evt)
results = append(results, actionResultFrom(action, err))
if err != nil {
e.logger.Error("action failed", "rule_id", ruleID, "action", action.Type, "error", err)
}
}
return results
}
func actionResultFrom(action Action, err error) ActionResult {
result := ActionResult{
Type: action.Type,
Value: action.Value,
OK: err == nil,
}
if err != nil {
result.Error = err.Error()
}
return result
}
func (e *Engine) recordRuleExecution(ctx context.Context, ruleID, messageID string, results []ActionResult) error {
actionsJSON, err := json.Marshal(results)
if err != nil {
return fmt.Errorf("marshal actions_applied: %w", err)
}
execError := aggregateActionErrors(results)
_, err = e.db.Exec(ctx, `
INSERT INTO rule_executions (rule_id, message_id, actions_applied, error)
VALUES ($1, $2, $3, $4)
`, ruleID, messageID, actionsJSON, execError)
if err != nil {
return fmt.Errorf("insert rule_executions: %w", err)
}
return nil
}
func aggregateActionErrors(results []ActionResult) string {
var parts []string
for _, r := range results {
if !r.OK && r.Error != "" {
parts = append(parts, fmt.Sprintf("%s: %s", r.Type, r.Error))
}
}
return strings.Join(parts, "; ")
}
func matchesAll(conditions []Condition, msg *Message) bool {
return matchesAllEvent(conditions, msg, nil)
}
func matchesAllEvent(conditions []Condition, msg *Message, evt *EventContext) bool {
for _, cond := range conditions {
if !matchCondition(cond, msg, evt) {
return false
}
}
return true
}
func matchCondition(cond Condition, msg *Message, evt *EventContext) bool {
if cond.Field == "label" || cond.Field == "contact_label" {
var labels []string
if cond.Field == "label" {
labels = msg.Labels
} else if evt != nil {
if evt.ContactLabel != "" {
labels = strings.Split(evt.ContactLabel, ", ")
}
}
has := labelListHas(labels, cond.Value)
switch cond.Operator {
case "has":
return has
case "not_has":
return !has
default:
return false
}
}
var fieldValue string
switch cond.Field {
case "from":
fieldValue = msg.From
case "to":
fieldValue = strings.Join(msg.To, ", ")
case "subject":
fieldValue = msg.Subject
case "body":
fieldValue = msg.BodyText
case "has_attachment":
if msg.HasAttachments {
fieldValue = "true"
} else {
fieldValue = "false"
}
case "drive_file_name":
if evt != nil {
fieldValue = evt.DriveFileName
}
case "drive_file_path":
if evt != nil {
fieldValue = evt.DriveFilePath
}
case "drive_mime_type":
if evt != nil {
fieldValue = evt.DriveMimeType
}
case "drive_file_size":
if evt != nil {
fieldValue = fmt.Sprintf("%d", evt.DriveFileSize)
}
case "drive_is_folder":
if evt != nil {
if evt.DriveIsFolder {
fieldValue = "true"
} else {
fieldValue = "false"
}
}
case "contact_name":
if evt != nil {
fieldValue = evt.ContactName
}
case "contact_email":
if evt != nil {
fieldValue = evt.ContactEmail
}
case "contact_phone":
if evt != nil {
fieldValue = evt.ContactPhone
}
case "contact_org":
if evt != nil {
fieldValue = evt.ContactOrg
}
case "calendar_event_title":
if evt != nil {
fieldValue = evt.CalendarEventTitle
}
case "calendar_event_location":
if evt != nil {
fieldValue = evt.CalendarEventLocation
}
case "calendar_event_organizer":
if evt != nil {
fieldValue = evt.CalendarEventOrganizer
}
case "calendar_event_attendee":
if evt != nil {
fieldValue = evt.CalendarEventAttendee
}
case "calendar_event_all_day":
if evt != nil {
if evt.CalendarEventAllDay {
fieldValue = "true"
} else {
fieldValue = "false"
}
}
case "calendar_event_has_video":
if evt != nil {
if evt.CalendarEventHasVideo {
fieldValue = "true"
} else {
fieldValue = "false"
}
}
default:
return false
}
switch cond.Operator {
case "regex":
re, err := regexp.Compile(cond.Value)
if err != nil {
return false
}
return re.MatchString(fieldValue)
case "not_regex":
re, err := regexp.Compile(cond.Value)
if err != nil {
return false
}
return !re.MatchString(fieldValue)
}
fieldLower := strings.ToLower(fieldValue)
valueLower := strings.ToLower(cond.Value)
switch cond.Operator {
case "contains":
return strings.Contains(fieldLower, valueLower)
case "equals":
return fieldLower == valueLower
case "starts_with":
return strings.HasPrefix(fieldLower, valueLower)
case "ends_with":
return strings.HasSuffix(fieldLower, valueLower)
case "not_contains":
return !strings.Contains(fieldLower, valueLower)
default:
return false
}
}
func messageHasLabel(msg *Message, label string) bool {
return labelListHas(msg.Labels, label)
}
func labelListHas(labels []string, label string) bool {
labelLower := strings.ToLower(strings.TrimSpace(label))
if labelLower == "" {
return false
}
for _, l := range labels {
if strings.ToLower(l) == labelLower {
return true
}
}
return false
}
func messageToWebhookContext(msg *Message, evt *EventContext) *webhooks.MessageContext {
return WebhookContextFromEvent(evt, msg)
}
func (e *Engine) executeAction(ctx context.Context, action Action, msg *Message, evt *EventContext) error {
switch action.Type {
case "label":
_, err := e.db.Exec(ctx, `
UPDATE messages SET labels = array_append(labels, $1), updated_at = NOW()
WHERE id = $2 AND NOT ($1 = ANY(labels))
`, action.Value, msg.ID)
return err
case "move":
_, err := e.db.Exec(ctx, `
UPDATE messages SET folder_id = (
SELECT id FROM mail_folders WHERE account_id = (
SELECT account_id FROM messages WHERE id = $2
) AND name = $1 LIMIT 1
), updated_at = NOW()
WHERE id = $2
`, action.Value, msg.ID)
return err
case "archive":
_, err := e.db.Exec(ctx, `
UPDATE messages SET flags = array_append(flags, '\Archive'), updated_at = NOW()
WHERE id = $1
`, msg.ID)
return err
case "mark_read":
_, err := e.db.Exec(ctx, `
UPDATE messages SET flags = array_append(flags, '\Seen'), updated_at = NOW()
WHERE id = $1 AND NOT ('\Seen' = ANY(flags))
`, msg.ID)
return err
case "delete":
_, err := e.db.Exec(ctx, `
UPDATE messages SET flags = array_append(flags, '\Deleted'), updated_at = NOW()
WHERE id = $1
`, msg.ID)
return err
case "remove_label":
_, err := e.db.Exec(ctx, `
UPDATE messages SET labels = array_remove(labels, $1), updated_at = NOW()
WHERE id = $2
`, action.Value, msg.ID)
return err
case "mark_important":
_, err := e.db.Exec(ctx, `
UPDATE messages SET flags = array_append(flags, '\Flagged'), updated_at = NOW()
WHERE id = $1 AND NOT ('\Flagged' = ANY(flags))
`, msg.ID)
return err
case "mark_spam":
_, err := e.db.Exec(ctx, `
UPDATE messages SET labels = (
SELECT array_agg(DISTINCT x) FROM unnest(array_append(labels, 'SPAM')) AS x
), updated_at = NOW()
WHERE id = $1
`, msg.ID)
return err
case "star":
_, err := e.db.Exec(ctx, `
UPDATE messages SET flags = array_append(flags, '\Flagged'), updated_at = NOW()
WHERE id = $1 AND NOT ('\Flagged' = ANY(flags))
`, msg.ID)
return err
case "notify":
e.logger.Info("notification action", "message_id", msg.ID, "body", action.Value)
return nil
case "reply", "send_mail", "forward":
e.logger.Info("deferred mail action", "type", action.Type, "message_id", msg.ID, "value", action.Value)
return nil
case "webhook":
if e.webhookExec == nil {
return fmt.Errorf("webhook executor not configured")
}
return e.webhookExec.Execute(ctx, action.Value, messageToWebhookContext(msg, evt))
case "drive_move", "drive_rename", "drive_delete", "drive_share", "drive_copy":
e.logger.Info("deferred drive action", "type", action.Type, "value", action.Value)
return nil
case "contact_add_label", "contact_remove_label", "contact_delete":
e.logger.Info("deferred contact action", "type", action.Type, "value", action.Value)
return nil
case "calendar_add_attendee", "calendar_update_title", "calendar_cancel_event", "calendar_notify_attendees":
e.logger.Info("deferred calendar action", "type", action.Type, "value", action.Value)
return nil
default:
return fmt.Errorf("unknown action type: %s", action.Type)
}
}