From 082cac36b2b7a005250e191fb36ac9d8c9bfa134 Mon Sep 17 00:00:00 2001 From: R3D347HR4Y Date: Sun, 7 Jun 2026 15:51:47 +0200 Subject: [PATCH] feat(automation): dispatch rules/webhooks on mail, drive, contacts Wire automation dispatcher to IMAP sync, drive mutations, and contact CRUD. Add webhook event_types and mail/drive/contacts scope filters (migration 30). --- cmd/ultid/main.go | 13 +- internal/api/contacts/automation_hooks.go | 31 +++ internal/api/contacts/handlers.go | 22 +- internal/api/drive/automation_hooks.go | 40 +++ internal/api/drive/handlers.go | 22 +- internal/api/drive/service.go | 7 + internal/api/mail/service.go | 58 +++- internal/api/mail/service_webhooks.go | 28 +- internal/api/mail/validate.go | 23 ++ internal/api/mail/webhook_scope.go | 50 ++++ internal/automation/dispatcher.go | 250 ++++++++++++++++++ internal/automation/scope.go | 81 ++++++ internal/mail/imap/pipeline.go | 30 ++- internal/mail/imap/sync.go | 8 +- internal/mail/rules/engine.go | 116 +++++--- internal/mail/rules/engine_test.go | 10 +- internal/mail/rules/simulate.go | 4 +- internal/mail/rules/webhook_context.go | 73 +++++ internal/mail/rules/workflow.go | 62 ++++- internal/mail/rules/workflow_exec.go | 50 +++- internal/mail/rules/workflow_simulate.go | 8 +- internal/mail/webhooks/executor.go | 24 ++ .../000030_webhook_automation_scope.down.sql | 5 + .../000030_webhook_automation_scope.up.sql | 5 + 24 files changed, 911 insertions(+), 109 deletions(-) create mode 100644 internal/api/contacts/automation_hooks.go create mode 100644 internal/api/drive/automation_hooks.go create mode 100644 internal/api/mail/webhook_scope.go create mode 100644 internal/automation/dispatcher.go create mode 100644 internal/automation/scope.go create mode 100644 internal/mail/rules/webhook_context.go create mode 100644 migrations/000030_webhook_automation_scope.down.sql create mode 100644 migrations/000030_webhook_automation_scope.up.sql diff --git a/cmd/ultid/main.go b/cmd/ultid/main.go index 967afca..2e70a26 100644 --- a/cmd/ultid/main.go +++ b/cmd/ultid/main.go @@ -30,6 +30,7 @@ import ( meetapi "github.com/ultisuite/ulti-backend/internal/api/meet" "github.com/ultisuite/ulti-backend/internal/api/middleware" photosapi "github.com/ultisuite/ulti-backend/internal/api/photos" + "github.com/ultisuite/ulti-backend/internal/automation" "github.com/ultisuite/ulti-backend/internal/authentik" "github.com/ultisuite/ulti-backend/internal/auth" "github.com/ultisuite/ulti-backend/internal/dbmigrate" @@ -159,7 +160,9 @@ func main() { hub := realtime.NewHub(verifierHolder, pool) healthChecker := observability.NewHealthChecker(cfg, pool, rdb) - rulesEngine := rules.NewEngineWithWebhooks(pool, webhooks.NewExecutor(pool)) + hookExec := webhooks.NewExecutor(pool) + rulesEngine := rules.NewEngineWithWebhooks(pool, hookExec) + autoDispatcher := automation.NewDispatcher(pool, rulesEngine, hookExec) oauthRedirect := cfg.MailOAuthRedirectURL if oauthRedirect == "" { @@ -181,7 +184,7 @@ func main() { syncWorker := imapsync.NewSyncWorker(pool, cfg.MailSyncInterval, credentialManager, mailOAuthSvc, imapsync.SyncDeps{ Storage: attachmentStorage, AttachBucket: cfg.MailAttachmentsBucket, - Rules: rulesEngine, + Automation: autoDispatcher, Hub: hub, }) go syncWorker.Start(ctx) @@ -228,10 +231,14 @@ func main() { var driveHandler *drive.Handler var driveSvc *drive.Service + var contactsHandler *contacts.Handler if ncClient != nil { driveSvc = drive.NewService(ncClient, hub, pool) + driveSvc.SetAutomation(autoDispatcher) driveHandler = drive.NewHandlerWithService(driveSvc) mailHandler.SetDriveUploader(&drivebridge.Bridge{Svc: driveSvc}) + contactsHandler = contacts.NewHandler(ncClient, pool) + contactsHandler.SetAutomation(autoDispatcher) } if ncClient != nil && cfg.OnlyOfficeEnabled && driveSvc != nil { officeSvc := office.NewService(ncClient, office.Config{ @@ -269,7 +276,7 @@ func main() { if driveHandler != nil { r.Mount("/api/v1/drive", driveHandler.Routes()) r.Mount("/api/v1/calendar", calendar.NewHandler(ncClient, meetCfg).Routes()) - r.Mount("/api/v1/contacts", contacts.NewHandler(ncClient, pool).Routes()) + r.Mount("/api/v1/contacts", contactsHandler.Routes()) } if meetCfg != nil { r.Mount("/api/v1/meet", meetapi.NewHandler(meetCfg).Routes()) diff --git a/internal/api/contacts/automation_hooks.go b/internal/api/contacts/automation_hooks.go new file mode 100644 index 0000000..0acbe06 --- /dev/null +++ b/internal/api/contacts/automation_hooks.go @@ -0,0 +1,31 @@ +package contacts + +import ( + "context" + + "github.com/ultisuite/ulti-backend/internal/automation" + "github.com/ultisuite/ulti-backend/internal/mail/rules" + "github.com/ultisuite/ulti-backend/internal/nextcloud" +) + +type contactAutomation interface { + OnContactEvent(ctx context.Context, externalUserID string, trigger rules.TriggerType, payload automation.ContactPayload) +} + +func (h *Handler) SetAutomation(d contactAutomation) { + h.automation = d +} + +func contactPayloadFrom(bookID string, contact *nextcloud.Contact) automation.ContactPayload { + if contact == nil { + return automation.ContactPayload{BookID: bookID} + } + return automation.ContactPayload{ + ID: contact.UID, + BookID: bookID, + Name: contact.FullName, + Email: contact.Email, + Phone: contact.Phone, + Org: contact.Org, + } +} diff --git a/internal/api/contacts/handlers.go b/internal/api/contacts/handlers.go index b95e384..a3c2d1b 100644 --- a/internal/api/contacts/handlers.go +++ b/internal/api/contacts/handlers.go @@ -16,15 +16,18 @@ import ( "github.com/ultisuite/ulti-backend/internal/api/middleware" "github.com/ultisuite/ulti-backend/internal/api/query" "github.com/ultisuite/ulti-backend/internal/auth" + "github.com/ultisuite/ulti-backend/internal/automation" "github.com/ultisuite/ulti-backend/internal/contacts/discovery" + "github.com/ultisuite/ulti-backend/internal/mail/rules" "github.com/ultisuite/ulti-backend/internal/nextcloud" "github.com/ultisuite/ulti-backend/internal/permission" ) type Handler struct { - svc *Service - discovery *discovery.Service - logger *slog.Logger + svc *Service + discovery *discovery.Service + automation contactAutomation + logger *slog.Logger } func NewHandler(nc *nextcloud.Client, db *pgxpool.Pool) *Handler { @@ -216,11 +219,15 @@ func (h *Handler) CreateContact(w http.ResponseWriter, r *http.Request) { return } - created, err := h.svc.CreateContact(r.Context(), ncUser, chi.URLParam(r, "bookID"), &contact) + bookID := chi.URLParam(r, "bookID") + created, err := h.svc.CreateContact(r.Context(), ncUser, bookID, &contact) if err != nil { h.writeContactServiceError(w, r, "create contact", err) return } + if h.automation != nil { + h.automation.OnContactEvent(r.Context(), claims.Sub, rules.TriggerContactCreated, contactPayloadFrom(bookID, created)) + } apiresponse.WriteJSON(w, http.StatusCreated, created) } @@ -284,6 +291,10 @@ func (h *Handler) UpdateContact(w http.ResponseWriter, r *http.Request) { h.writeContactServiceError(w, r, "update contact", err) return } + if h.automation != nil { + contact.Path = contactPath + h.automation.OnContactEvent(r.Context(), claims.Sub, rules.TriggerContactUpdated, contactPayloadFrom("", &contact)) + } apiresponse.WriteJSON(w, http.StatusOK, map[string]any{"etag": etag}) } @@ -380,6 +391,9 @@ func (h *Handler) DeleteContact(w http.ResponseWriter, r *http.Request) { h.writeContactServiceError(w, r, "delete contact", err) return } + if h.automation != nil { + h.automation.OnContactEvent(r.Context(), claims.Sub, rules.TriggerContactDeleted, automation.ContactPayload{ID: contactPath}) + } w.WriteHeader(http.StatusNoContent) } diff --git a/internal/api/drive/automation_hooks.go b/internal/api/drive/automation_hooks.go new file mode 100644 index 0000000..55bae0f --- /dev/null +++ b/internal/api/drive/automation_hooks.go @@ -0,0 +1,40 @@ +package drive + +import ( + "context" + "path" + + "github.com/ultisuite/ulti-backend/internal/automation" + "github.com/ultisuite/ulti-backend/internal/mail/rules" + "github.com/ultisuite/ulti-backend/internal/nextcloud" +) + +func (s *Service) SetAutomation(d driveAutomation) { + s.automation = d +} + +func (s *Service) afterDriveFileEvent(ctx context.Context, externalUserID string, trigger rules.TriggerType, filePath string, isFolder bool) { + normalized := nextcloud.NormalizeClientPath(filePath) + s.notifyFileChanged(externalUserID, normalized) + if s.automation == nil { + return + } + s.automation.OnDriveEvent(ctx, externalUserID, trigger, automation.DrivePayloadFromPath(normalized, isFolder)) +} + +func (s *Service) afterDriveShareEvent(ctx context.Context, externalUserID string, filePath string) { + normalized := nextcloud.NormalizeClientPath(filePath) + s.notifyShareUpdated(externalUserID, normalized) + if s.automation == nil { + return + } + s.automation.OnDriveEvent(ctx, externalUserID, rules.TriggerDriveShareUpdated, automation.DrivePayloadFromPath(normalized, false)) +} + +func renamedPath(oldPath, newName string) string { + dir := path.Dir(nextcloud.NormalizeClientPath(oldPath)) + if dir == "." || dir == "" { + dir = "/" + } + return nextcloud.NormalizeClientPath(path.Join(dir, newName)) +} diff --git a/internal/api/drive/handlers.go b/internal/api/drive/handlers.go index cdcc66e..99d9713 100644 --- a/internal/api/drive/handlers.go +++ b/internal/api/drive/handlers.go @@ -16,6 +16,7 @@ import ( "github.com/ultisuite/ulti-backend/internal/api/middleware" "github.com/ultisuite/ulti-backend/internal/api/query" "github.com/ultisuite/ulti-backend/internal/auth" + "github.com/ultisuite/ulti-backend/internal/mail/rules" "github.com/ultisuite/ulti-backend/internal/nextcloud" "github.com/ultisuite/ulti-backend/internal/permission" "github.com/ultisuite/ulti-backend/internal/realtime" @@ -174,7 +175,7 @@ func (h *Handler) Upload(w http.ResponseWriter, r *http.Request) { writeDriveError(w, r, err) return } - h.svc.notifyFileChanged(claims.Sub, path) + h.svc.afterDriveFileEvent(r.Context(), claims.Sub, rules.TriggerDriveFileCreated, path, false) apiresponse.WriteJSON(w, http.StatusCreated, map[string]string{"status": "uploaded", "path": path}) } @@ -245,7 +246,7 @@ func (h *Handler) DeleteFile(w http.ResponseWriter, r *http.Request) { writeDriveError(w, r, err) return } - h.svc.notifyFileChanged(claims.Sub, path) + h.svc.afterDriveFileEvent(r.Context(), claims.Sub, rules.TriggerDriveFileDeleted, path, false) w.WriteHeader(http.StatusNoContent) } @@ -266,6 +267,7 @@ func (h *Handler) CreateFolder(w http.ResponseWriter, r *http.Request) { writeDriveError(w, r, err) return } + h.svc.afterDriveFileEvent(r.Context(), claims.Sub, rules.TriggerDriveFileCreated, path, true) w.WriteHeader(http.StatusCreated) } @@ -293,7 +295,7 @@ func (h *Handler) Move(w http.ResponseWriter, r *http.Request) { writeDriveError(w, r, err) return } - h.svc.notifyFileChanged(claims.Sub, req.Destination) + h.svc.afterDriveFileEvent(r.Context(), claims.Sub, rules.TriggerDriveFileMoved, req.Destination, false) w.WriteHeader(http.StatusNoContent) } @@ -321,6 +323,7 @@ func (h *Handler) Copy(w http.ResponseWriter, r *http.Request) { writeDriveError(w, r, err) return } + h.svc.afterDriveFileEvent(r.Context(), claims.Sub, rules.TriggerDriveFileCreated, req.Destination, false) w.WriteHeader(http.StatusNoContent) } @@ -348,6 +351,7 @@ func (h *Handler) Rename(w http.ResponseWriter, r *http.Request) { writeDriveError(w, r, err) return } + h.svc.afterDriveFileEvent(r.Context(), claims.Sub, rules.TriggerDriveFileUpdated, renamedPath(req.Path, req.NewName), false) w.WriteHeader(http.StatusNoContent) } @@ -465,7 +469,7 @@ func (h *Handler) CreateShare(w http.ResponseWriter, r *http.Request) { writeDriveError(w, r, err) return } - h.svc.notifyShareUpdated(claims.Sub, req.Path) + h.svc.afterDriveShareEvent(r.Context(), claims.Sub, req.Path) apiresponse.WriteJSON(w, http.StatusCreated, share) } @@ -598,7 +602,7 @@ func (h *Handler) UpdateShare(w http.ResponseWriter, r *http.Request) { writeDriveError(w, r, err) return } - h.svc.notifyShareUpdated(claims.Sub, share.Path) + h.svc.afterDriveShareEvent(r.Context(), claims.Sub, share.Path) apiresponse.WriteJSON(w, http.StatusOK, share) } @@ -613,7 +617,7 @@ func (h *Handler) DeleteShare(w http.ResponseWriter, r *http.Request) { writeDriveError(w, r, err) return } - h.svc.notifyShareUpdated(claims.Sub, "") + h.svc.afterDriveShareEvent(r.Context(), claims.Sub, "") w.WriteHeader(http.StatusNoContent) } @@ -635,7 +639,7 @@ func (h *Handler) RestoreTrash(w http.ResponseWriter, r *http.Request) { writeDriveError(w, r, err) return } - h.svc.notifyFileChanged(claims.Sub, req.Name) + h.svc.afterDriveFileEvent(r.Context(), claims.Sub, rules.TriggerDriveFileCreated, req.Name, false) w.WriteHeader(http.StatusNoContent) } @@ -691,7 +695,7 @@ func (h *Handler) SetFavorite(w http.ResponseWriter, r *http.Request) { writeDriveError(w, r, err) return } - h.svc.notifyFileChanged(claims.Sub, req.Path) + h.svc.afterDriveFileEvent(r.Context(), claims.Sub, rules.TriggerDriveFileUpdated, req.Path, false) w.WriteHeader(http.StatusNoContent) } @@ -715,7 +719,7 @@ func (h *Handler) CreateNewFile(w http.ResponseWriter, r *http.Request) { writeDriveError(w, r, err) return } - h.svc.notifyFileChanged(claims.Sub, target) + h.svc.afterDriveFileEvent(r.Context(), claims.Sub, rules.TriggerDriveFileCreated, target, false) apiresponse.WriteJSON(w, http.StatusCreated, map[string]string{"path": target}) } diff --git a/internal/api/drive/service.go b/internal/api/drive/service.go index 12ed1f6..c8d768c 100644 --- a/internal/api/drive/service.go +++ b/internal/api/drive/service.go @@ -17,6 +17,8 @@ import ( "github.com/ultisuite/ulti-backend/internal/api/paginate" "github.com/ultisuite/ulti-backend/internal/api/query" "github.com/ultisuite/ulti-backend/internal/auth" + "github.com/ultisuite/ulti-backend/internal/automation" + "github.com/ultisuite/ulti-backend/internal/mail/rules" "github.com/ultisuite/ulti-backend/internal/nextcloud" "github.com/ultisuite/ulti-backend/internal/realtime" ) @@ -33,10 +35,15 @@ type Service struct { nc *nextcloud.Client hub *realtime.Hub db *pgxpool.Pool + automation driveAutomation maxUploadBytes int64 quotaReserveByte int64 } +type driveAutomation interface { + OnDriveEvent(ctx context.Context, externalUserID string, trigger rules.TriggerType, payload automation.DrivePayload) +} + func NewService(nc *nextcloud.Client, hub *realtime.Hub, db *pgxpool.Pool) *Service { return &Service{ nc: nc, diff --git a/internal/api/mail/service.go b/internal/api/mail/service.go index 77b3eff..c559a17 100644 --- a/internal/api/mail/service.go +++ b/internal/api/mail/service.go @@ -721,7 +721,8 @@ func (s *Service) ListWebhooks(ctx context.Context, externalID string, params qu } rows, err := s.db.Query(ctx, ` - SELECT id, name, url, method, version, is_active FROM webhook_templates + SELECT id, name, url, method, version, is_active, body_template, event_types, mail_scope, drive_scope, contacts_scope + FROM webhook_templates WHERE user_id = (SELECT id FROM users WHERE external_id = $1) ORDER BY created_at ASC LIMIT $2 OFFSET $3 @@ -733,14 +734,20 @@ func (s *Service) ListWebhooks(ctx context.Context, externalID string, params qu webhooks := make([]map[string]any, 0) for rows.Next() { - var id, name, url, method string + var id, name, url, method, bodyTemplate string var version int var isActive bool - if err := rows.Scan(&id, &name, &url, &method, &version, &isActive); err != nil { + var eventTypes, mailScope, driveScope, contactsScope []byte + if err := rows.Scan(&id, &name, &url, &method, &version, &isActive, &bodyTemplate, &eventTypes, &mailScope, &driveScope, &contactsScope); err != nil { return WebhooksList{}, err } webhooks = append(webhooks, map[string]any{ "id": id, "name": name, "url": url, "method": method, "version": version, "is_active": isActive, + "body_template": bodyTemplate, + "event_types": jsonRawOrEmptyArray(eventTypes), + "mail_scope": jsonRawOrEmptyObject(mailScope), + "drive_scope": jsonRawOrEmptyObject(driveScope), + "contacts_scope": jsonRawOrEmptyObject(contactsScope), }) } if err := rows.Err(); err != nil { @@ -753,15 +760,52 @@ func (s *Service) ListWebhooks(ctx context.Context, externalID string, params qu }, nil } +func jsonRawOrEmptyArray(raw []byte) json.RawMessage { + if len(raw) == 0 { + return json.RawMessage("[]") + } + return json.RawMessage(raw) +} + +func jsonRawOrEmptyObject(raw []byte) json.RawMessage { + if len(raw) == 0 { + return json.RawMessage("{}") + } + return json.RawMessage(raw) +} + func (s *Service) CreateWebhook(ctx context.Context, externalID string, req *createWebhookRequest, method string, maxRetries int) (string, error) { headersJSON, _ := json.Marshal(req.Headers) + eventTypesJSON, err := marshalWebhookEventTypes(req.EventTypes) + if err != nil { + return "", err + } + mailScopeJSON, err := marshalWebhookMailScope(req.MailScope) + if err != nil { + return "", err + } + driveScopeJSON, err := marshalWebhookDriveScope(req.DriveScope) + if err != nil { + return "", err + } + contactsScopeJSON, err := marshalWebhookContactsScope(req.ContactsScope) + if err != nil { + return "", err + } var id string - err := s.db.QueryRow(ctx, ` - INSERT INTO webhook_templates (user_id, name, url, method, headers, body_template, version, signing_secret, max_retries) - VALUES ((SELECT id FROM users WHERE external_id = $1), $2, $3, $4, $5, $6, 1, $7, $8) + err = s.db.QueryRow(ctx, ` + INSERT INTO webhook_templates ( + user_id, name, url, method, headers, body_template, version, signing_secret, max_retries, + event_types, mail_scope, drive_scope, contacts_scope + ) + VALUES ( + (SELECT id FROM users WHERE external_id = $1), $2, $3, $4, $5, $6, 1, $7, $8, + $9, $10, $11, $12 + ) RETURNING id - `, externalID, req.Name, req.URL, method, headersJSON, req.BodyTemplate, req.SigningSecret, maxRetries).Scan(&id) + `, externalID, req.Name, req.URL, method, headersJSON, req.BodyTemplate, req.SigningSecret, maxRetries, + eventTypesJSON, mailScopeJSON, driveScopeJSON, contactsScopeJSON).Scan(&id) if err != nil { return "", err } diff --git a/internal/api/mail/service_webhooks.go b/internal/api/mail/service_webhooks.go index 1f119f9..d6f7ca4 100644 --- a/internal/api/mail/service_webhooks.go +++ b/internal/api/mail/service_webhooks.go @@ -19,6 +19,23 @@ func (s *Service) UpdateWebhook(ctx context.Context, externalID, webhookID strin defer tx.Rollback(ctx) var version int + eventTypesJSON, err := marshalWebhookEventTypes(req.EventTypes) + if err != nil { + return err + } + mailScopeJSON, err := marshalWebhookMailScope(req.MailScope) + if err != nil { + return err + } + driveScopeJSON, err := marshalWebhookDriveScope(req.DriveScope) + if err != nil { + return err + } + contactsScopeJSON, err := marshalWebhookContactsScope(req.ContactsScope) + if err != nil { + return err + } + err = tx.QueryRow(ctx, ` UPDATE webhook_templates SET @@ -29,12 +46,17 @@ func (s *Service) UpdateWebhook(ctx context.Context, externalID, webhookID strin body_template = $5, signing_secret = $6, max_retries = $7, + event_types = $8, + mail_scope = $9, + drive_scope = $10, + contacts_scope = $11, version = version + 1, updated_at = NOW() - WHERE id = $8 - AND user_id = (SELECT id FROM users WHERE external_id = $9) + WHERE id = $12 + AND user_id = (SELECT id FROM users WHERE external_id = $13) RETURNING version - `, req.Name, req.URL, method, headersJSON, req.BodyTemplate, req.SigningSecret, maxRetries, webhookID, externalID).Scan(&version) + `, req.Name, req.URL, method, headersJSON, req.BodyTemplate, req.SigningSecret, maxRetries, + eventTypesJSON, mailScopeJSON, driveScopeJSON, contactsScopeJSON, webhookID, externalID).Scan(&version) if err != nil { if errors.Is(err, pgx.ErrNoRows) { return ErrNotFound diff --git a/internal/api/mail/validate.go b/internal/api/mail/validate.go index 923c7c3..8208825 100644 --- a/internal/api/mail/validate.go +++ b/internal/api/mail/validate.go @@ -605,6 +605,25 @@ type createWebhookRequest struct { BodyTemplate string `json:"body_template"` SigningSecret string `json:"signing_secret"` MaxRetries *int `json:"max_retries"` + EventTypes []string `json:"event_types"` + MailScope *webhookMailScope `json:"mail_scope"` + DriveScope *webhookDriveScope `json:"drive_scope"` + ContactsScope *webhookContactsScope `json:"contacts_scope"` +} + +type webhookMailScope struct { + AllAccounts bool `json:"all_accounts"` + AccountIDs []string `json:"account_ids"` +} + +type webhookDriveScope struct { + AllFolders bool `json:"all_folders"` + FolderPaths []string `json:"folder_paths"` +} + +type webhookContactsScope struct { + AllBooks bool `json:"all_books"` + BookIDs []string `json:"book_ids"` } type updateWebhookRequest struct { @@ -615,6 +634,10 @@ type updateWebhookRequest struct { BodyTemplate string `json:"body_template"` SigningSecret string `json:"signing_secret"` MaxRetries *int `json:"max_retries"` + EventTypes []string `json:"event_types"` + MailScope *webhookMailScope `json:"mail_scope"` + DriveScope *webhookDriveScope `json:"drive_scope"` + ContactsScope *webhookContactsScope `json:"contacts_scope"` } type previewWebhookMessageRequest struct { diff --git a/internal/api/mail/webhook_scope.go b/internal/api/mail/webhook_scope.go new file mode 100644 index 0000000..7bda7bb --- /dev/null +++ b/internal/api/mail/webhook_scope.go @@ -0,0 +1,50 @@ +package mail + +import ( + "encoding/json" + + "github.com/ultisuite/ulti-backend/internal/automation" +) + +func marshalWebhookEventTypes(types []string) ([]byte, error) { + if types == nil { + types = []string{} + } + return json.Marshal(types) +} + +func marshalWebhookMailScope(scope *webhookMailScope) ([]byte, error) { + out := automation.MailScope{AllAccounts: true} + if scope != nil { + out.AllAccounts = scope.AllAccounts + out.AccountIDs = scope.AccountIDs + if out.AccountIDs == nil { + out.AccountIDs = []string{} + } + } + return json.Marshal(out) +} + +func marshalWebhookDriveScope(scope *webhookDriveScope) ([]byte, error) { + out := automation.DriveScope{AllFolders: true} + if scope != nil { + out.AllFolders = scope.AllFolders + out.FolderPaths = scope.FolderPaths + if out.FolderPaths == nil { + out.FolderPaths = []string{} + } + } + return json.Marshal(out) +} + +func marshalWebhookContactsScope(scope *webhookContactsScope) ([]byte, error) { + out := automation.ContactsScope{AllBooks: true} + if scope != nil { + out.AllBooks = scope.AllBooks + out.BookIDs = scope.BookIDs + if out.BookIDs == nil { + out.BookIDs = []string{} + } + } + return json.Marshal(out) +} diff --git a/internal/automation/dispatcher.go b/internal/automation/dispatcher.go new file mode 100644 index 0000000..375784b --- /dev/null +++ b/internal/automation/dispatcher.go @@ -0,0 +1,250 @@ +package automation + +import ( + "context" + "encoding/json" + "log/slog" + "path" + "strings" + "time" + + "github.com/jackc/pgx/v5/pgxpool" + "github.com/ultisuite/ulti-backend/internal/mail/rules" + "github.com/ultisuite/ulti-backend/internal/mail/webhooks" + "github.com/ultisuite/ulti-backend/internal/nextcloud" +) + +type Dispatcher struct { + db *pgxpool.Pool + rules *rules.Engine + hooks *webhooks.Executor + logger *slog.Logger +} + +func NewDispatcher(db *pgxpool.Pool, rulesEngine *rules.Engine, hookExec *webhooks.Executor) *Dispatcher { + return &Dispatcher{ + db: db, + rules: rulesEngine, + hooks: hookExec, + logger: slog.Default().With("component", "automation-dispatcher"), + } +} + +type DrivePayload struct { + FilePath string + FileName string + MimeType string + FileSize int64 + IsFolder bool +} + +type ContactPayload struct { + ID string + BookID string + Name string + Email string + Phone string + Org string + Labels []string +} + +func DrivePayloadFromPath(filePath string, isFolder bool) DrivePayload { + normalized := nextcloud.NormalizeClientPath(filePath) + name := path.Base(strings.TrimSuffix(normalized, "/")) + if name == "" || name == "." { + name = normalized + } + return DrivePayload{ + FilePath: normalized, + FileName: name, + IsFolder: isFolder, + } +} + +func (d *Dispatcher) OnMailCreated(ctx context.Context, userID, accountID, messageID string, msg *rules.Message) { + if d == nil || userID == "" || msg == nil { + return + } + evt := &rules.EventContext{Type: rules.TriggerMessageReceived} + if msg.AccountID == "" { + msg.AccountID = accountID + } + if msg.ID == "" { + msg.ID = messageID + } + d.runRules(ctx, userID, msg, evt) + d.dispatchWebhooks(ctx, userID, string(rules.TriggerMessageReceived), evt, msg, accountID, "", "") +} + +func (d *Dispatcher) OnDriveEvent(ctx context.Context, externalUserID string, trigger rules.TriggerType, payload DrivePayload) { + if d == nil || externalUserID == "" { + return + } + userID, err := d.resolveUserID(ctx, externalUserID) + if err != nil { + d.logger.Error("resolve user for drive automation", "error", err, "sub", externalUserID) + return + } + evt := driveEventContext(trigger, payload) + msg := &rules.Message{} + d.runRules(ctx, userID, msg, evt) + d.dispatchWebhooks(ctx, userID, string(trigger), evt, msg, "", payload.FilePath, "") +} + +func (d *Dispatcher) OnContactEvent(ctx context.Context, externalUserID string, trigger rules.TriggerType, payload ContactPayload) { + if d == nil || externalUserID == "" { + return + } + userID, err := d.resolveUserID(ctx, externalUserID) + if err != nil { + d.logger.Error("resolve user for contact automation", "error", err, "sub", externalUserID) + return + } + evt := contactEventContext(trigger, payload) + msg := &rules.Message{} + d.runRules(ctx, userID, msg, evt) + d.dispatchWebhooks(ctx, userID, string(trigger), evt, msg, "", "", payload.BookID) +} + +func (d *Dispatcher) runRules(ctx context.Context, userID string, msg *rules.Message, evt *rules.EventContext) { + if d.rules == nil { + return + } + if err := d.rules.EvaluateMessageEvent(ctx, userID, msg, evt); err != nil { + d.logger.Error("rules evaluation", "error", err, "user_id", userID, "trigger", evt.Type) + } +} + +type webhookTemplateRow struct { + id string + eventTypes []byte + mailScope []byte + driveScope []byte + contactsScope []byte +} + +func (d *Dispatcher) dispatchWebhooks( + ctx context.Context, + userID string, + eventType string, + evt *rules.EventContext, + msg *rules.Message, + accountID string, + drivePath string, + bookID string, +) { + if d.hooks == nil || d.db == nil { + return + } + rows, err := d.db.Query(ctx, ` + SELECT id, event_types, mail_scope, drive_scope, contacts_scope + FROM webhook_templates + WHERE user_id = $1 AND is_active = true + `, userID) + if err != nil { + d.logger.Error("list webhook templates", "error", err) + return + } + defer rows.Close() + + msgCtx := rules.WebhookContextFromEvent(evt, msg) + for rows.Next() { + var row webhookTemplateRow + if err := rows.Scan(&row.id, &row.eventTypes, &row.mailScope, &row.driveScope, &row.contactsScope); err != nil { + d.logger.Error("scan webhook template", "error", err) + continue + } + if !webhookMatchesEvent(row, eventType) { + continue + } + if !webhookMatchesScope(row, accountID, drivePath, bookID) { + continue + } + if err := d.hooks.Execute(ctx, row.id, msgCtx); err != nil { + d.logger.Error("webhook dispatch", "template_id", row.id, "error", err) + } + } +} + +func webhookMatchesEvent(row webhookTemplateRow, eventType string) bool { + var types []string + if len(row.eventTypes) > 0 { + _ = json.Unmarshal(row.eventTypes, &types) + } + if len(types) == 0 { + return false + } + for _, t := range types { + if t == eventType { + return true + } + } + return false +} + +func webhookMatchesScope(row webhookTemplateRow, accountID, drivePath, bookID string) bool { + var mailScope MailScope + var driveScope DriveScope + var contactsScope ContactsScope + _ = json.Unmarshal(row.mailScope, &mailScope) + _ = json.Unmarshal(row.driveScope, &driveScope) + _ = json.Unmarshal(row.contactsScope, &contactsScope) + + if accountID != "" { + return AllowsMailScope(mailScope, accountID) + } + if drivePath != "" { + return AllowsDriveScope(driveScope, drivePath) + } + if bookID != "" { + return AllowsContactsScope(contactsScope, bookID) + } + return true +} + +func (d *Dispatcher) resolveUserID(ctx context.Context, externalID string) (string, error) { + var userID string + err := d.db.QueryRow(ctx, `SELECT id FROM users WHERE external_id = $1`, externalID).Scan(&userID) + return userID, err +} + +func driveEventContext(trigger rules.TriggerType, p DrivePayload) *rules.EventContext { + return &rules.EventContext{ + Type: trigger, + FolderPath: path.Dir(p.FilePath), + DriveFileName: p.FileName, + DriveFilePath: p.FilePath, + DriveMimeType: p.MimeType, + DriveFileSize: p.FileSize, + DriveIsFolder: p.IsFolder, + } +} + +func contactEventContext(trigger rules.TriggerType, p ContactPayload) *rules.EventContext { + return &rules.EventContext{ + Type: trigger, + ContactLabel: strings.Join(p.Labels, ", "), + ContactID: p.ID, + ContactName: p.Name, + ContactEmail: p.Email, + ContactPhone: p.Phone, + ContactOrg: p.Org, + ContactBookID: p.BookID, + } +} + +func EventDomain(trigger rules.TriggerType) string { + switch trigger { + case rules.TriggerDriveFileCreated, rules.TriggerDriveFileUpdated, rules.TriggerDriveFileDeleted, + rules.TriggerDriveFileMoved, rules.TriggerDriveShareUpdated: + return "drive" + case rules.TriggerContactCreated, rules.TriggerContactUpdated, rules.TriggerContactDeleted: + return "contacts" + default: + return "mail" + } +} + +func NowISO() string { + return time.Now().UTC().Format(time.RFC3339) +} diff --git a/internal/automation/scope.go b/internal/automation/scope.go new file mode 100644 index 0000000..5126bad --- /dev/null +++ b/internal/automation/scope.go @@ -0,0 +1,81 @@ +package automation + +import ( + "github.com/ultisuite/ulti-backend/internal/apitokens" +) + +type MailScope struct { + AllAccounts bool `json:"all_accounts"` + AccountIDs []string `json:"account_ids"` +} + +type DriveScope struct { + AllFolders bool `json:"all_folders"` + FolderPaths []string `json:"folder_paths"` +} + +type ContactsScope struct { + AllBooks bool `json:"all_books"` + BookIDs []string `json:"book_ids"` +} + +func AllowsMailScope(scope MailScope, accountID string) bool { + if accountID == "" { + return true + } + if scope.AllAccounts { + return true + } + for _, id := range scope.AccountIDs { + if id == accountID { + return true + } + } + return false +} + +func AllowsDriveScope(scope DriveScope, filePath string) bool { + if scope.AllFolders { + return true + } + target := apitokens.NormalizeDriveScopePath(filePath) + if target == "" { + return true + } + for _, allowed := range scope.FolderPaths { + if apitokens.NormalizeDriveScopePath(allowed) == "/" { + return true + } + if drivePathWithinScope(target, allowed) { + return true + } + } + return false +} + +func drivePathWithinScope(target, allowed string) bool { + target = apitokens.NormalizeDriveScopePath(target) + allowed = apitokens.NormalizeDriveScopePath(allowed) + if allowed == "/" { + return true + } + if target == allowed { + return true + } + return len(target) > len(allowed) && target[:len(allowed)+1] == allowed+"/" +} + +func AllowsContactsScope(scope ContactsScope, bookID string) bool { + if bookID == "" { + return true + } + if scope.AllBooks { + return true + } + for _, id := range scope.BookIDs { + if id == bookID { + return true + } + } + return false +} diff --git a/internal/mail/imap/pipeline.go b/internal/mail/imap/pipeline.go index bb73ace..76fff31 100644 --- a/internal/mail/imap/pipeline.go +++ b/internal/mail/imap/pipeline.go @@ -19,18 +19,20 @@ type postSyncEvent struct { // syncPipeline runs rules and realtime notifications after message sync. type syncPipeline struct { - db *pgxpool.Pool - logger *slog.Logger - rules *rules.Engine - hub *realtime.Hub + db *pgxpool.Pool + logger *slog.Logger + rules *rules.Engine + automation MailAutomation + hub *realtime.Hub } -func newSyncPipeline(db *pgxpool.Pool, rulesEngine *rules.Engine, hub *realtime.Hub) *syncPipeline { +func newSyncPipeline(db *pgxpool.Pool, rulesEngine *rules.Engine, automation MailAutomation, hub *realtime.Hub) *syncPipeline { return &syncPipeline{ - db: db, - logger: slog.Default().With("component", "imap-pipeline"), - rules: rulesEngine, - hub: hub, + db: db, + logger: slog.Default().With("component", "imap-pipeline"), + rules: rulesEngine, + automation: automation, + hub: hub, } } @@ -40,12 +42,16 @@ func (p *syncPipeline) handle(ctx context.Context, ev postSyncEvent) { return } - if p.rules != nil && ev.kind == "created" { + if ev.kind == "created" { msg, err := p.loadRuleMessage(ctx, ev.messageID) if err != nil { p.logger.Error("load message for rules", "message_id", ev.messageID, "error", err) - } else if err := p.rules.EvaluateMessage(ctx, ev.userID, msg); err != nil { - p.logger.Error("rules evaluation failed", "message_id", ev.messageID, "error", err) + } else if p.automation != nil { + p.automation.OnMailCreated(ctx, ev.userID, ev.accountID, ev.messageID, msg) + } else if p.rules != nil { + if err := p.rules.EvaluateMessage(ctx, ev.userID, msg); err != nil { + p.logger.Error("rules evaluation failed", "message_id", ev.messageID, "error", err) + } } } diff --git a/internal/mail/imap/sync.go b/internal/mail/imap/sync.go index c43e21d..b9d3ffe 100644 --- a/internal/mail/imap/sync.go +++ b/internal/mail/imap/sync.go @@ -26,11 +26,17 @@ import ( "github.com/ultisuite/ulti-backend/internal/realtime" ) +// MailAutomation runs rules and outbound webhooks after a new message is synced. +type MailAutomation interface { + OnMailCreated(ctx context.Context, userID, accountID, messageID string, msg *rules.Message) +} + // SyncDeps optional services wired into the IMAP sync worker. type SyncDeps struct { Storage *storage.Client AttachBucket string Rules *rules.Engine + Automation MailAutomation Hub *realtime.Hub } @@ -54,7 +60,7 @@ func NewSyncWorker(db *pgxpool.Pool, interval time.Duration, credManager *creden oauth: oauthSvc, storage: deps.Storage, attachBucket: deps.AttachBucket, - pipeline: newSyncPipeline(db, deps.Rules, deps.Hub), + pipeline: newSyncPipeline(db, deps.Rules, deps.Automation, deps.Hub), } } diff --git a/internal/mail/rules/engine.go b/internal/mail/rules/engine.go index 8d87a2a..898adff 100644 --- a/internal/mail/rules/engine.go +++ b/internal/mail/rules/engine.go @@ -127,7 +127,7 @@ func (e *Engine) EvaluateMessageEvent(ctx context.Context, userID string, msg *M e.logger.Error("workflow missing start", "rule_id", ruleID) continue } - execCtx := newExecContext(msg, userID, wf.Variables) + execCtx := newExecContext(msg, userID, wf.Variables, evt) if err := e.walkWorkflow(ctx, userID, msg, wf, startID, execCtx, 0); err != nil { e.logger.Error("execute workflow", "rule_id", ruleID, "error", err) } @@ -137,10 +137,10 @@ func (e *Engine) EvaluateMessageEvent(ctx context.Context, userID string, msg *M var actions []Action json.Unmarshal(condJSON, &conditions) json.Unmarshal(actJSON, &actions) - if !matchesAll(conditions, msg) { + if !matchesAllEvent(conditions, msg, evt) { continue } - results = e.executeRuleActions(ctx, ruleID, actions, msg) + results = e.executeRuleActions(ctx, ruleID, actions, msg, evt) } e.logger.Info("rule matched", "rule_id", ruleID, "rule_name", name, "message_id", msg.ID) @@ -153,10 +153,10 @@ func (e *Engine) EvaluateMessageEvent(ctx context.Context, userID string, msg *M return nil } -func (e *Engine) executeRuleActions(ctx context.Context, ruleID string, actions []Action, msg *Message) []ActionResult { +func (e *Engine) executeRuleActions(ctx context.Context, ruleID string, actions []Action, msg *Message, evt *EventContext) []ActionResult { results := make([]ActionResult, 0, len(actions)) for _, action := range actions { - err := e.executeAction(ctx, action, msg) + err := e.executeAction(ctx, action, msg, evt) results = append(results, actionResultFrom(action, err)) if err != nil { e.logger.Error("action failed", "rule_id", ruleID, "action", action.Type, "error", err) @@ -206,17 +206,29 @@ func aggregateActionErrors(results []ActionResult) string { } func matchesAll(conditions []Condition, msg *Message) bool { + return matchesAllEvent(conditions, msg, nil) +} + +func matchesAllEvent(conditions []Condition, msg *Message, evt *EventContext) bool { for _, cond := range conditions { - if !matchCondition(cond, msg) { + if !matchCondition(cond, msg, evt) { return false } } return true } -func matchCondition(cond Condition, msg *Message) bool { - if cond.Field == "label" { - has := messageHasLabel(msg, cond.Value) +func matchCondition(cond Condition, msg *Message, evt *EventContext) bool { + if cond.Field == "label" || cond.Field == "contact_label" { + var labels []string + if cond.Field == "label" { + labels = msg.Labels + } else if evt != nil { + if evt.ContactLabel != "" { + labels = strings.Split(evt.ContactLabel, ", ") + } + } + has := labelListHas(labels, cond.Value) switch cond.Operator { case "has": return has @@ -243,6 +255,46 @@ func matchCondition(cond Condition, msg *Message) bool { } else { fieldValue = "false" } + case "drive_file_name": + if evt != nil { + fieldValue = evt.DriveFileName + } + case "drive_file_path": + if evt != nil { + fieldValue = evt.DriveFilePath + } + case "drive_mime_type": + if evt != nil { + fieldValue = evt.DriveMimeType + } + case "drive_file_size": + if evt != nil { + fieldValue = fmt.Sprintf("%d", evt.DriveFileSize) + } + case "drive_is_folder": + if evt != nil { + if evt.DriveIsFolder { + fieldValue = "true" + } else { + fieldValue = "false" + } + } + case "contact_name": + if evt != nil { + fieldValue = evt.ContactName + } + case "contact_email": + if evt != nil { + fieldValue = evt.ContactEmail + } + case "contact_phone": + if evt != nil { + fieldValue = evt.ContactPhone + } + case "contact_org": + if evt != nil { + fieldValue = evt.ContactOrg + } default: return false } @@ -282,11 +334,15 @@ func matchCondition(cond Condition, msg *Message) bool { } func messageHasLabel(msg *Message, label string) bool { + return labelListHas(msg.Labels, label) +} + +func labelListHas(labels []string, label string) bool { labelLower := strings.ToLower(strings.TrimSpace(label)) if labelLower == "" { return false } - for _, l := range msg.Labels { + for _, l := range labels { if strings.ToLower(l) == labelLower { return true } @@ -294,37 +350,11 @@ func messageHasLabel(msg *Message, label string) bool { return false } -func messageToWebhookContext(msg *Message) *webhooks.MessageContext { - senderName, senderEmail := parseFromAddress(msg.From) - return &webhooks.MessageContext{ - SenderName: senderName, - SenderEmail: senderEmail, - Subject: msg.Subject, - BodyText: msg.BodyText, - Recipients: strings.Join(msg.To, ", "), - HasAttachment: msg.HasAttachments, - MessageID: msg.ID, - } +func messageToWebhookContext(msg *Message, evt *EventContext) *webhooks.MessageContext { + return WebhookContextFromEvent(evt, msg) } -func parseFromAddress(from string) (name, email string) { - from = strings.TrimSpace(from) - if from == "" { - return "", "" - } - if i := strings.LastIndex(from, "<"); i >= 0 { - j := strings.LastIndex(from, ">") - if j > i { - email = strings.TrimSpace(from[i+1 : j]) - name = strings.TrimSpace(from[:i]) - name = strings.Trim(name, `"`) - return name, email - } - } - return "", from -} - -func (e *Engine) executeAction(ctx context.Context, action Action, msg *Message) error { +func (e *Engine) executeAction(ctx context.Context, action Action, msg *Message, evt *EventContext) error { switch action.Type { case "label": _, err := e.db.Exec(ctx, ` @@ -396,7 +426,13 @@ func (e *Engine) executeAction(ctx context.Context, action Action, msg *Message) if e.webhookExec == nil { return fmt.Errorf("webhook executor not configured") } - return e.webhookExec.Execute(ctx, action.Value, messageToWebhookContext(msg)) + return e.webhookExec.Execute(ctx, action.Value, messageToWebhookContext(msg, evt)) + case "drive_move", "drive_rename", "drive_delete", "drive_share", "drive_copy": + e.logger.Info("deferred drive action", "type", action.Type, "value", action.Value) + return nil + case "contact_add_label", "contact_remove_label", "contact_delete": + e.logger.Info("deferred contact action", "type", action.Type, "value", action.Value) + return nil default: return fmt.Errorf("unknown action type: %s", action.Type) } diff --git a/internal/mail/rules/engine_test.go b/internal/mail/rules/engine_test.go index 10259a8..961e843 100644 --- a/internal/mail/rules/engine_test.go +++ b/internal/mail/rules/engine_test.go @@ -48,7 +48,7 @@ func TestMatchCondition_fieldsAndOperators(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if got := matchCondition(tt.cond, msg); got != tt.match { + if got := matchCondition(tt.cond, msg, nil); got != tt.match { t.Fatalf("matchCondition() = %v, want %v", got, tt.match) } }) @@ -87,7 +87,7 @@ func TestMatchesAll(t *testing.T) { func TestExecuteAction_unknownType(t *testing.T) { e := &Engine{} - err := e.executeAction(context.Background(), Action{Type: "unknown_action", Value: "x@example.com"}, &Message{ID: "msg-1"}) + err := e.executeAction(context.Background(), Action{Type: "unknown_action", Value: "x@example.com"}, &Message{ID: "msg-1"}, nil) if err == nil { t.Fatal("executeAction() error = nil, want unknown action type error") } @@ -168,7 +168,7 @@ func TestParseFromAddress(t *testing.T) { func TestMessageToWebhookContext(t *testing.T) { msg := testMessage() - ctx := messageToWebhookContext(msg) + ctx := messageToWebhookContext(msg, nil) if ctx.SenderName != "Alice" || ctx.SenderEmail != "alice@example.com" { t.Fatalf("sender = (%q, %q), want (Alice, alice@example.com)", ctx.SenderName, ctx.SenderEmail) } @@ -200,7 +200,7 @@ func TestExecuteAction_webhook(t *testing.T) { mock := &mockWebhookExecutor{} e := &Engine{webhookExec: mock} - if err := e.executeAction(context.Background(), Action{Type: "webhook", Value: "tpl-abc"}, msg); err != nil { + if err := e.executeAction(context.Background(), Action{Type: "webhook", Value: "tpl-abc"}, msg, nil); err != nil { t.Fatalf("executeAction() error = %v", err) } if mock.templateID != "tpl-abc" { @@ -211,7 +211,7 @@ func TestExecuteAction_webhook(t *testing.T) { } e.webhookExec = nil - err := e.executeAction(context.Background(), Action{Type: "webhook", Value: "tpl-abc"}, msg) + err := e.executeAction(context.Background(), Action{Type: "webhook", Value: "tpl-abc"}, msg, nil) if err == nil || !strings.Contains(err.Error(), "webhook executor not configured") { t.Fatalf("executeAction() without executor = %v, want not configured error", err) } diff --git a/internal/mail/rules/simulate.go b/internal/mail/rules/simulate.go index e49bb71..b863250 100644 --- a/internal/mail/rules/simulate.go +++ b/internal/mail/rules/simulate.go @@ -37,7 +37,7 @@ func (e *Engine) simulateActions(ctx context.Context, actions []Action, msg *Mes func (e *Engine) simulateAction(ctx context.Context, action Action, msg *Message) SimulatedActionResult { switch action.Type { - case "label", "move", "archive", "delete", "mark_read", "remove_label", "mark_important", "mark_spam", "star", "notify", "reply", "send_mail", "forward": + case "label", "move", "archive", "delete", "mark_read", "remove_label", "mark_important", "mark_spam", "star", "notify", "reply", "send_mail", "forward", "drive_move", "drive_rename", "drive_delete", "drive_share", "drive_copy", "contact_add_label", "contact_remove_label", "contact_delete": return SimulatedActionResult{ ActionResult: ActionResult{Type: action.Type, Value: action.Value, OK: true}, } @@ -58,7 +58,7 @@ func (e *Engine) simulateAction(ctx context.Context, action Action, msg *Message ActionResult: actionResultFrom(action, fmt.Errorf("query template: %w", err)), } } - payload := webhooks.RenderBodyTemplate(bodyTemplate, messageToWebhookContext(msg)) + payload := webhooks.RenderBodyTemplate(bodyTemplate, messageToWebhookContext(msg, nil)) return SimulatedActionResult{ ActionResult: ActionResult{Type: action.Type, Value: action.Value, OK: true}, SimulatedPayload: payload, diff --git a/internal/mail/rules/webhook_context.go b/internal/mail/rules/webhook_context.go new file mode 100644 index 0000000..2e83464 --- /dev/null +++ b/internal/mail/rules/webhook_context.go @@ -0,0 +1,73 @@ +package rules + +import ( + "strconv" + "strings" + "time" + + "github.com/ultisuite/ulti-backend/internal/mail/webhooks" +) + +func WebhookContextFromEvent(evt *EventContext, msg *Message) *webhooks.MessageContext { + ctx := &webhooks.MessageContext{Date: time.Now().UTC().Format(time.RFC3339)} + if msg != nil { + senderName, senderEmail := parseFromAddress(msg.From) + ctx.SenderName = senderName + ctx.SenderEmail = senderEmail + ctx.Subject = msg.Subject + ctx.BodyText = msg.BodyText + ctx.Recipients = strings.Join(msg.To, ", ") + ctx.HasAttachment = msg.HasAttachments + ctx.MessageID = msg.ID + } + if evt == nil { + ctx.EventDomain = "mail" + return ctx + } + ctx.EventType = string(evt.Type) + ctx.EventDomain = webhookEventDomain(evt.Type) + ctx.DriveFileName = evt.DriveFileName + ctx.DriveFilePath = evt.DriveFilePath + ctx.DriveMimeType = evt.DriveMimeType + ctx.DriveFileSize = strconv.FormatInt(evt.DriveFileSize, 10) + if evt.DriveIsFolder { + ctx.DriveIsFolder = "true" + } else { + ctx.DriveIsFolder = "false" + } + ctx.ContactID = evt.ContactID + ctx.ContactName = evt.ContactName + ctx.ContactEmail = evt.ContactEmail + ctx.ContactPhone = evt.ContactPhone + ctx.ContactOrg = evt.ContactOrg + return ctx +} + +func webhookEventDomain(trigger TriggerType) string { + switch trigger { + case TriggerDriveFileCreated, TriggerDriveFileUpdated, TriggerDriveFileDeleted, + TriggerDriveFileMoved, TriggerDriveShareUpdated: + return "drive" + case TriggerContactCreated, TriggerContactUpdated, TriggerContactDeleted: + return "contacts" + default: + return "mail" + } +} + +func parseFromAddress(from string) (name, email string) { + from = strings.TrimSpace(from) + if from == "" { + return "", "" + } + if i := strings.LastIndex(from, "<"); i >= 0 { + j := strings.LastIndex(from, ">") + if j > i { + email = strings.TrimSpace(from[i+1 : j]) + name = strings.TrimSpace(from[:i]) + name = strings.Trim(name, `"`) + return name, email + } + } + return "", from +} diff --git a/internal/mail/rules/workflow.go b/internal/mail/rules/workflow.go index 247cf25..b5a989f 100644 --- a/internal/mail/rules/workflow.go +++ b/internal/mail/rules/workflow.go @@ -3,6 +3,7 @@ package rules import ( "encoding/json" "fmt" + "strings" ) const WorkflowVersion = 1 @@ -17,16 +18,26 @@ const ( type TriggerType string const ( - TriggerMessageReceived TriggerType = "message_received" - TriggerLabelAdded TriggerType = "label_added" - TriggerLabelRemoved TriggerType = "label_removed" + TriggerMessageReceived TriggerType = "message_received" + TriggerLabelAdded TriggerType = "label_added" + TriggerLabelRemoved TriggerType = "label_removed" + TriggerDriveFileCreated TriggerType = "drive_file_created" + TriggerDriveFileUpdated TriggerType = "drive_file_updated" + TriggerDriveFileDeleted TriggerType = "drive_file_deleted" + TriggerDriveFileMoved TriggerType = "drive_file_moved" + TriggerDriveShareUpdated TriggerType = "drive_share_updated" + TriggerContactCreated TriggerType = "contact_created" + TriggerContactUpdated TriggerType = "contact_updated" + TriggerContactDeleted TriggerType = "contact_deleted" ) type Trigger struct { - Type TriggerType `json:"type"` - FolderID string `json:"folder_id,omitempty"` - Label string `json:"label,omitempty"` - AccountID string `json:"account_id,omitempty"` + Type TriggerType `json:"type"` + FolderID string `json:"folder_id,omitempty"` + Label string `json:"label,omitempty"` + AccountID string `json:"account_id,omitempty"` + FolderPath string `json:"folder_path,omitempty"` + ContactLabel string `json:"contact_label,omitempty"` } type TriggerGroup struct { @@ -114,9 +125,24 @@ type CallRuleNodeData struct { } type EventContext struct { - Type TriggerType - FolderID string - Label string + Type TriggerType + FolderID string + Label string + FolderPath string + ContactLabel string + // Drive payload (when domain is drive) + DriveFileName string + DriveFilePath string + DriveMimeType string + DriveFileSize int64 + DriveIsFolder bool + // Contact payload (when domain is contacts) + ContactID string + ContactBookID string + ContactName string + ContactEmail string + ContactPhone string + ContactOrg string } func ParseWorkflow(raw []byte) (*Workflow, error) { @@ -234,6 +260,22 @@ func matchTrigger(t Trigger, msg *Message, evt *EventContext) bool { return false } return true + case TriggerDriveFileCreated, TriggerDriveFileUpdated, TriggerDriveFileDeleted, TriggerDriveFileMoved, TriggerDriveShareUpdated: + if evt == nil || evt.Type != t.Type { + return false + } + if t.FolderPath != "" && evt.FolderPath != "" && !strings.HasPrefix(evt.DriveFilePath, t.FolderPath) { + return false + } + return true + case TriggerContactCreated, TriggerContactUpdated, TriggerContactDeleted: + if evt == nil || evt.Type != t.Type { + return false + } + if t.ContactLabel != "" && evt.ContactLabel != "" && t.ContactLabel != evt.ContactLabel { + return false + } + return true default: return false } diff --git a/internal/mail/rules/workflow_exec.go b/internal/mail/rules/workflow_exec.go index 1a97858..682c357 100644 --- a/internal/mail/rules/workflow_exec.go +++ b/internal/mail/rules/workflow_exec.go @@ -10,11 +10,12 @@ import ( type ExecContext struct { Variables map[string]string Message *Message + Event *EventContext UserID string Results []ActionResult } -func newExecContext(msg *Message, userID string, vars []ExecVariable) *ExecContext { +func newExecContext(msg *Message, userID string, vars []ExecVariable, evt *EventContext) *ExecContext { m := make(map[string]string, len(vars)) for _, v := range vars { m[v.Name] = v.Default @@ -22,6 +23,7 @@ func newExecContext(msg *Message, userID string, vars []ExecVariable) *ExecConte return &ExecContext{ Variables: m, Message: msg, + Event: evt, UserID: userID, Results: make([]ActionResult, 0), } @@ -32,7 +34,7 @@ func (e *Engine) ExecuteWorkflow(ctx context.Context, userID string, msg *Messag return nil, nil } if wf.Kind == RuleKindFunction { - return e.runWorkflowGraph(ctx, userID, msg, wf, newExecContext(msg, userID, wf.Variables)) + return e.runWorkflowGraph(ctx, userID, msg, wf, newExecContext(msg, userID, wf.Variables, evt)) } if !matchesTriggers(wf.Triggers, msg, evt) { return nil, nil @@ -41,7 +43,7 @@ func (e *Engine) ExecuteWorkflow(ctx context.Context, userID string, msg *Messag if startID == "" { return nil, fmt.Errorf("workflow missing start node") } - execCtx := newExecContext(msg, userID, wf.Variables) + execCtx := newExecContext(msg, userID, wf.Variables, evt) if err := e.walkWorkflow(ctx, userID, msg, wf, startID, execCtx, 0); err != nil { return execCtx.Results, err } @@ -78,7 +80,7 @@ func (e *Engine) walkWorkflow(ctx context.Context, userID string, msg *Message, cond.Operator = "not_has" } handle := "false" - if matchCondition(cond, msg) { + if matchCondition(cond, msg, execCtx.Event) { handle = "true" } return e.walkWorkflow(ctx, userID, msg, wf, wf.nextNode(nodeID, handle), execCtx, depth+1) @@ -90,7 +92,7 @@ func (e *Engine) walkWorkflow(ctx context.Context, userID string, msg *Message, } cond := Condition{Field: data.Field, Operator: data.Operator, Value: interpolateValue(data.Value, execCtx)} handle := "false" - if matchCondition(cond, msg) { + if matchCondition(cond, msg, execCtx.Event) { handle = "true" } return e.walkWorkflow(ctx, userID, msg, wf, wf.nextNode(nodeID, handle), execCtx, depth+1) @@ -100,7 +102,7 @@ func (e *Engine) walkWorkflow(ctx context.Context, userID string, msg *Message, if err := json.Unmarshal(node.Data, &data); err != nil { return fmt.Errorf("switch node data: %w", err) } - fieldVal := workflowFieldValue(data.Field, msg, execCtx) + fieldVal := workflowFieldValue(data.Field, msg, execCtx.Event, execCtx) handle := "default" for i, c := range data.Cases { if strings.EqualFold(fieldVal, c.Value) { @@ -132,7 +134,7 @@ func (e *Engine) walkWorkflow(ctx context.Context, userID string, msg *Message, } for _, item := range data.Actions { action := Action{Type: item.Type, Value: interpolateValue(item.Value, execCtx)} - err := e.executeAction(ctx, action, msg) + err := e.executeAction(ctx, action, msg, execCtx.Event) result := actionResultFrom(action, err) execCtx.Results = append(execCtx.Results, result) if err != nil { @@ -199,6 +201,7 @@ func (e *Engine) invokeSubWorkflow(ctx context.Context, userID string, msg *Mess childCtx := &ExecContext{ Variables: copyVars(parent.Variables), Message: msg, + Event: parent.Event, UserID: userID, Results: parent.Results, } @@ -217,7 +220,7 @@ func copyVars(src map[string]string) map[string]string { return dst } -func workflowFieldValue(field string, msg *Message, execCtx *ExecContext) string { +func workflowFieldValue(field string, msg *Message, evt *EventContext, execCtx *ExecContext) string { if strings.HasPrefix(field, "$") { name := strings.TrimPrefix(field, "$") if v, ok := execCtx.Variables[name]; ok { @@ -242,7 +245,36 @@ func workflowFieldValue(field string, msg *Message, execCtx *ExecContext) string case "label": return strings.Join(msg.Labels, ", ") default: - return "" + if evt == nil { + return "" + } + switch field { + case "drive_file_name": + return evt.DriveFileName + case "drive_file_path": + return evt.DriveFilePath + case "drive_mime_type": + return evt.DriveMimeType + case "drive_file_size": + return fmt.Sprintf("%d", evt.DriveFileSize) + case "drive_is_folder": + if evt.DriveIsFolder { + return "true" + } + return "false" + case "contact_name": + return evt.ContactName + case "contact_email": + return evt.ContactEmail + case "contact_phone": + return evt.ContactPhone + case "contact_org": + return evt.ContactOrg + case "contact_label": + return evt.ContactLabel + default: + return "" + } } } diff --git a/internal/mail/rules/workflow_simulate.go b/internal/mail/rules/workflow_simulate.go index fd950e3..b817a48 100644 --- a/internal/mail/rules/workflow_simulate.go +++ b/internal/mail/rules/workflow_simulate.go @@ -29,7 +29,7 @@ func (e *Engine) SimulateWorkflow(ctx context.Context, userID string, wf *Workfl if startID == "" { return WorkflowSimulationResult{Matched: false} } - execCtx := newExecContext(msg, userID, wf.Variables) + execCtx := newExecContext(msg, userID, wf.Variables, evt) steps := make([]WorkflowSimulationStep, 0) e.simulateWalk(ctx, userID, msg, wf, startID, execCtx, &steps, 0) simActions := make([]SimulatedActionResult, 0, len(execCtx.Results)) @@ -62,7 +62,7 @@ func (e *Engine) simulateWalk(ctx context.Context, userID string, msg *Message, var data ConditionNodeData json.Unmarshal(node.Data, &data) handle := "false" - if matchCondition(Condition{Field: data.Field, Operator: data.Operator, Value: interpolateValue(data.Value, execCtx)}, msg) { + if matchCondition(Condition{Field: data.Field, Operator: data.Operator, Value: interpolateValue(data.Value, execCtx)}, msg, execCtx.Event) { handle = "true" } *steps = append(*steps, WorkflowSimulationStep{NodeID: nodeID, NodeType: node.Type, Handle: handle}) @@ -76,7 +76,7 @@ func (e *Engine) simulateWalk(ctx context.Context, userID string, msg *Message, op = "not_has" } handle := "false" - if matchCondition(Condition{Field: "label", Operator: op, Value: data.Label}, msg) { + if matchCondition(Condition{Field: "label", Operator: op, Value: data.Label}, msg, execCtx.Event) { handle = "true" } *steps = append(*steps, WorkflowSimulationStep{NodeID: nodeID, NodeType: node.Type, Handle: handle}) @@ -85,7 +85,7 @@ func (e *Engine) simulateWalk(ctx context.Context, userID string, msg *Message, case "switch": var data SwitchNodeData json.Unmarshal(node.Data, &data) - fieldVal := workflowFieldValue(data.Field, msg, execCtx) + fieldVal := workflowFieldValue(data.Field, msg, execCtx.Event, execCtx) handle := "default" for i, c := range data.Cases { if fieldVal == c.Value { diff --git a/internal/mail/webhooks/executor.go b/internal/mail/webhooks/executor.go index eaccef0..226cec5 100644 --- a/internal/mail/webhooks/executor.go +++ b/internal/mail/webhooks/executor.go @@ -45,6 +45,18 @@ type MessageContext struct { Recipients string `json:"recipients"` HasAttachment bool `json:"has_attachment"` MessageID string `json:"message_id"` + EventType string `json:"event_type,omitempty"` + EventDomain string `json:"event_domain,omitempty"` + DriveFileName string `json:"drive_file_name,omitempty"` + DriveFilePath string `json:"drive_file_path,omitempty"` + DriveMimeType string `json:"drive_mime_type,omitempty"` + DriveFileSize string `json:"drive_file_size,omitempty"` + DriveIsFolder string `json:"drive_is_folder,omitempty"` + ContactID string `json:"contact_id,omitempty"` + ContactName string `json:"contact_name,omitempty"` + ContactEmail string `json:"contact_email,omitempty"` + ContactPhone string `json:"contact_phone,omitempty"` + ContactOrg string `json:"contact_org,omitempty"` } const ( @@ -276,6 +288,18 @@ func interpolate(template string, ctx *MessageContext) string { "$date", ctx.Date, "$recipients.to", ctx.Recipients, "$message_id", ctx.MessageID, + "$event.type", ctx.EventType, + "$event.domain", ctx.EventDomain, + "$drive.file_name", ctx.DriveFileName, + "$drive.file_path", ctx.DriveFilePath, + "$drive.mime_type", ctx.DriveMimeType, + "$drive.file_size", ctx.DriveFileSize, + "$drive.is_folder", ctx.DriveIsFolder, + "$contact.id", ctx.ContactID, + "$contact.name", ctx.ContactName, + "$contact.email", ctx.ContactEmail, + "$contact.phone", ctx.ContactPhone, + "$contact.org", ctx.ContactOrg, ) return r.Replace(template) } diff --git a/migrations/000030_webhook_automation_scope.down.sql b/migrations/000030_webhook_automation_scope.down.sql new file mode 100644 index 0000000..9afdd7a --- /dev/null +++ b/migrations/000030_webhook_automation_scope.down.sql @@ -0,0 +1,5 @@ +ALTER TABLE webhook_templates + DROP COLUMN IF EXISTS event_types, + DROP COLUMN IF EXISTS mail_scope, + DROP COLUMN IF EXISTS drive_scope, + DROP COLUMN IF EXISTS contacts_scope; diff --git a/migrations/000030_webhook_automation_scope.up.sql b/migrations/000030_webhook_automation_scope.up.sql new file mode 100644 index 0000000..9cd0753 --- /dev/null +++ b/migrations/000030_webhook_automation_scope.up.sql @@ -0,0 +1,5 @@ +ALTER TABLE webhook_templates + ADD COLUMN IF NOT EXISTS event_types JSONB NOT NULL DEFAULT '[]'::jsonb, + ADD COLUMN IF NOT EXISTS mail_scope JSONB NOT NULL DEFAULT '{"all_accounts":true,"account_ids":[]}'::jsonb, + ADD COLUMN IF NOT EXISTS drive_scope JSONB NOT NULL DEFAULT '{"all_folders":true,"folder_paths":[]}'::jsonb, + ADD COLUMN IF NOT EXISTS contacts_scope JSONB NOT NULL DEFAULT '{"all_books":true,"book_ids":[]}'::jsonb;