diff --git a/internal/api/mail/handlers.go b/internal/api/mail/handlers.go index 450c0b1..cb1590c 100644 --- a/internal/api/mail/handlers.go +++ b/internal/api/mail/handlers.go @@ -96,9 +96,12 @@ func (h *Handler) Routes() chi.Router { r.Post("/rules", h.CreateRule) r.Put("/rules/{ruleID}", h.UpdateRule) r.Delete("/rules/{ruleID}", h.DeleteRule) + r.Post("/rules/simulate", h.SimulateRule) r.Get("/webhooks", h.ListWebhooks) r.Post("/webhooks", h.CreateWebhook) + r.Post("/webhooks/preview", h.PreviewWebhookTemplate) + r.Put("/webhooks/{webhookID}", h.UpdateWebhook) r.Delete("/webhooks/{webhookID}", h.DeleteWebhook) return r @@ -422,6 +425,31 @@ func (h *Handler) DeleteRule(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusNoContent) } +func (h *Handler) SimulateRule(w http.ResponseWriter, r *http.Request) { + claims := middleware.ClaimsFromContext(r.Context()) + + var req simulateRuleRequest + if err := apivalidate.DecodeJSON(w, r, maxRulesRequestBody, &req); err != nil { + return + } + if verr := validateSimulateRule(&req); verr != nil { + apivalidate.WriteValidationError(w, r, verr) + return + } + + result, err := h.svc.SimulateRule(r.Context(), claims.Sub, &req) + if err != nil { + if errors.Is(err, ErrNotFound) { + apivalidate.WriteNotFound(w, r, "not found") + return + } + h.logger.Error("simulate rule", "error", err) + apivalidate.WriteInternal(w, r) + return + } + apiresponse.WriteJSON(w, http.StatusOK, result) +} + func (h *Handler) ListWebhooks(w http.ResponseWriter, r *http.Request) { claims := middleware.ClaimsFromContext(r.Context()) params, err := query.ParseListRequest(r) @@ -446,13 +474,13 @@ func (h *Handler) CreateWebhook(w http.ResponseWriter, r *http.Request) { if err := apivalidate.DecodeJSON(w, r, maxWebhookRequestBody, &req); err != nil { return } - method, verr := validateCreateWebhook(&req) + method, maxRetries, verr := validateCreateWebhook(&req) if verr != nil { apivalidate.WriteValidationError(w, r, verr) return } - id, err := h.svc.CreateWebhook(r.Context(), claims.Sub, &req, method) + id, err := h.svc.CreateWebhook(r.Context(), claims.Sub, &req, method, maxRetries) if err != nil { h.logger.Error("create webhook", "error", err) apivalidate.WriteInternal(w, r) @@ -461,6 +489,52 @@ func (h *Handler) CreateWebhook(w http.ResponseWriter, r *http.Request) { apiresponse.WriteJSON(w, http.StatusCreated, map[string]string{"id": id}) } +func (h *Handler) UpdateWebhook(w http.ResponseWriter, r *http.Request) { + claims := middleware.ClaimsFromContext(r.Context()) + + var req updateWebhookRequest + if err := apivalidate.DecodeJSON(w, r, maxWebhookRequestBody, &req); err != nil { + return + } + method, maxRetries, verr := validateUpdateWebhook(&req) + if verr != nil { + apivalidate.WriteValidationError(w, r, verr) + return + } + + if err := h.svc.UpdateWebhook(r.Context(), claims.Sub, chi.URLParam(r, "webhookID"), &req, method, maxRetries); err != nil { + if errors.Is(err, ErrNotFound) { + apivalidate.WriteNotFound(w, r, "not found") + return + } + h.logger.Error("update webhook", "error", err) + apivalidate.WriteInternal(w, r) + return + } + w.WriteHeader(http.StatusNoContent) +} + +func (h *Handler) PreviewWebhookTemplate(w http.ResponseWriter, r *http.Request) { + claims := middleware.ClaimsFromContext(r.Context()) + + var req previewWebhookRequest + if err := apivalidate.DecodeJSON(w, r, maxWebhookRequestBody, &req); err != nil { + return + } + if verr := validatePreviewWebhook(&req); verr != nil { + apivalidate.WriteValidationError(w, r, verr) + return + } + + result, err := h.svc.PreviewWebhookTemplate(r.Context(), claims.Sub, &req) + if err != nil { + h.logger.Error("preview webhook template", "error", err) + apivalidate.WriteInternal(w, r) + return + } + apiresponse.WriteJSON(w, http.StatusOK, result) +} + func (h *Handler) DeleteWebhook(w http.ResponseWriter, r *http.Request) { claims := middleware.ClaimsFromContext(r.Context()) diff --git a/internal/api/mail/handlers_test.go b/internal/api/mail/handlers_test.go index adbe129..79e9e12 100644 --- a/internal/api/mail/handlers_test.go +++ b/internal/api/mail/handlers_test.go @@ -7,6 +7,7 @@ import ( "io" "net/http" "net/http/httptest" + "strings" "testing" "time" @@ -15,6 +16,7 @@ import ( "github.com/ultisuite/ulti-backend/internal/api/middleware" "github.com/ultisuite/ulti-backend/internal/api/query" "github.com/ultisuite/ulti-backend/internal/auth" + "github.com/ultisuite/ulti-backend/internal/mail/rules" ) const ( @@ -249,12 +251,59 @@ func (f *fakeMailService) DeleteRule(_ context.Context, externalID, ruleID strin } return nil } + +func (f *fakeMailService) SimulateRule(_ context.Context, externalID string, req *simulateRuleRequest) (rules.SimulationResult, error) { + if externalID != testExternalID { + return rules.SimulationResult{}, ErrUserNotProvisioned + } + if req.Message == nil { + return rules.SimulationResult{}, ErrNotFound + } + + matched := false + for _, cond := range []struct { + field, operator, value string + }{ + {"from", "contains", "alice"}, + {"subject", "contains", "invoice"}, + } { + fieldValue := "" + switch cond.field { + case "from": + fieldValue = req.Message.From + case "subject": + fieldValue = req.Message.Subject + } + if strings.Contains(strings.ToLower(fieldValue), strings.ToLower(cond.value)) { + matched = true + break + } + } + + if !matched { + return rules.SimulationResult{Matched: false}, nil + } + + return rules.SimulationResult{ + Matched: true, + Actions: []rules.SimulatedActionResult{ + {ActionResult: rules.ActionResult{Type: "label", Value: "work", OK: true}}, + }, + }, nil +} + func (f *fakeMailService) ListWebhooks(context.Context, string, query.ListParams) (WebhooksList, error) { return WebhooksList{}, nil } -func (f *fakeMailService) CreateWebhook(context.Context, string, *createWebhookRequest, string) (string, error) { +func (f *fakeMailService) CreateWebhook(context.Context, string, *createWebhookRequest, string, int) (string, error) { return "", nil } +func (f *fakeMailService) UpdateWebhook(context.Context, string, string, *updateWebhookRequest, string, int) error { + return nil +} +func (f *fakeMailService) PreviewWebhookTemplate(_ context.Context, _ string, req *previewWebhookRequest) (map[string]any, error) { + return map[string]any{"payload": req.BodyTemplate}, nil +} func (f *fakeMailService) DeleteWebhook(_ context.Context, externalID, webhookID string) error { if externalID != testExternalID { return ErrNotFound @@ -796,3 +845,131 @@ func TestCancelScheduledOutbox(t *testing.T) { } }) } + +func TestSimulateRule(t *testing.T) { + svc := newFakeMailService() + router := newTestMailRouter(svc) + + t.Run("matched inline rule", func(t *testing.T) { + body, err := json.Marshal(map[string]any{ + "message": map[string]any{ + "from": "Alice ", + "to": []string{"bob@example.com"}, + "subject": "Invoice Q1", + }, + "rule": map[string]any{ + "conditions": []map[string]string{ + {"field": "subject", "operator": "contains", "value": "invoice"}, + }, + "actions": []map[string]string{ + {"type": "label", "value": "work"}, + }, + }, + }) + if err != nil { + t.Fatalf("marshal payload: %v", err) + } + + req := httptest.NewRequest(http.MethodPost, "/rules/simulate", bytes.NewReader(body)) + req.Header.Set("Content-Type", "application/json") + rec := httptest.NewRecorder() + router.ServeHTTP(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("status = %d, want %d; body = %s", rec.Code, http.StatusOK, rec.Body.String()) + } + + var resp rules.SimulationResult + if err := json.NewDecoder(rec.Body).Decode(&resp); err != nil { + t.Fatalf("decode body: %v", err) + } + if !resp.Matched { + t.Fatal("matched = false, want true") + } + if len(resp.Actions) != 1 || resp.Actions[0].Type != "label" || !resp.Actions[0].OK { + t.Fatalf("actions = %#v", resp.Actions) + } + }) + + t.Run("no match", func(t *testing.T) { + body, err := json.Marshal(map[string]any{ + "message": map[string]any{ + "from": "bob@example.com", + "subject": "Hello", + }, + "rule": map[string]any{ + "conditions": []map[string]string{ + {"field": "subject", "operator": "contains", "value": "invoice"}, + }, + "actions": []map[string]string{ + {"type": "label", "value": "work"}, + }, + }, + }) + if err != nil { + t.Fatalf("marshal payload: %v", err) + } + + req := httptest.NewRequest(http.MethodPost, "/rules/simulate", bytes.NewReader(body)) + req.Header.Set("Content-Type", "application/json") + rec := httptest.NewRecorder() + router.ServeHTTP(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("status = %d, want %d; body = %s", rec.Code, http.StatusOK, rec.Body.String()) + } + + var resp rules.SimulationResult + if err := json.NewDecoder(rec.Body).Decode(&resp); err != nil { + t.Fatalf("decode body: %v", err) + } + if resp.Matched { + t.Fatal("matched = true, want false") + } + }) + + t.Run("validation missing message", func(t *testing.T) { + body, err := json.Marshal(map[string]any{ + "rule": map[string]any{ + "conditions": []map[string]string{ + {"field": "subject", "operator": "contains", "value": "invoice"}, + }, + "actions": []map[string]string{ + {"type": "label", "value": "work"}, + }, + }, + }) + if err != nil { + t.Fatalf("marshal payload: %v", err) + } + + req := httptest.NewRequest(http.MethodPost, "/rules/simulate", bytes.NewReader(body)) + req.Header.Set("Content-Type", "application/json") + rec := httptest.NewRecorder() + router.ServeHTTP(rec, req) + + if rec.Code != http.StatusBadRequest { + t.Fatalf("status = %d, want %d; body = %s", rec.Code, http.StatusBadRequest, rec.Body.String()) + } + }) + + t.Run("validation missing rule source", func(t *testing.T) { + body, err := json.Marshal(map[string]any{ + "message": map[string]any{ + "subject": "Invoice Q1", + }, + }) + if err != nil { + t.Fatalf("marshal payload: %v", err) + } + + req := httptest.NewRequest(http.MethodPost, "/rules/simulate", bytes.NewReader(body)) + req.Header.Set("Content-Type", "application/json") + rec := httptest.NewRecorder() + router.ServeHTTP(rec, req) + + if rec.Code != http.StatusBadRequest { + t.Fatalf("status = %d, want %d; body = %s", rec.Code, http.StatusBadRequest, rec.Body.String()) + } + }) +} diff --git a/internal/api/mail/service.go b/internal/api/mail/service.go index 686bba3..d2f501b 100644 --- a/internal/api/mail/service.go +++ b/internal/api/mail/service.go @@ -603,7 +603,7 @@ func (s *Service) ListWebhooks(ctx context.Context, externalID string, params qu } rows, err := s.db.Query(ctx, ` - SELECT id, name, url, method, is_active FROM webhook_templates + SELECT id, name, url, method, version, is_active FROM webhook_templates WHERE user_id = (SELECT id FROM users WHERE external_id = $1) ORDER BY created_at ASC LIMIT $2 OFFSET $3 @@ -616,12 +616,13 @@ func (s *Service) ListWebhooks(ctx context.Context, externalID string, params qu webhooks := make([]map[string]any, 0) for rows.Next() { var id, name, url, method string + var version int var isActive bool - if err := rows.Scan(&id, &name, &url, &method, &isActive); err != nil { + if err := rows.Scan(&id, &name, &url, &method, &version, &isActive); err != nil { return WebhooksList{}, err } webhooks = append(webhooks, map[string]any{ - "id": id, "name": name, "url": url, "method": method, "is_active": isActive, + "id": id, "name": name, "url": url, "method": method, "version": version, "is_active": isActive, }) } if err := rows.Err(); err != nil { @@ -634,18 +635,24 @@ func (s *Service) ListWebhooks(ctx context.Context, externalID string, params qu }, nil } -func (s *Service) CreateWebhook(ctx context.Context, externalID string, req *createWebhookRequest, method string) (string, error) { +func (s *Service) CreateWebhook(ctx context.Context, externalID string, req *createWebhookRequest, method string, maxRetries int) (string, error) { headersJSON, _ := json.Marshal(req.Headers) var id string err := s.db.QueryRow(ctx, ` - INSERT INTO webhook_templates (user_id, name, url, method, headers, body_template) - VALUES ((SELECT id FROM users WHERE external_id = $1), $2, $3, $4, $5, $6) + INSERT INTO webhook_templates (user_id, name, url, method, headers, body_template, version, signing_secret, max_retries) + VALUES ((SELECT id FROM users WHERE external_id = $1), $2, $3, $4, $5, $6, 1, $7, $8) RETURNING id - `, externalID, req.Name, req.URL, method, headersJSON, req.BodyTemplate).Scan(&id) + `, externalID, req.Name, req.URL, method, headersJSON, req.BodyTemplate, req.SigningSecret, maxRetries).Scan(&id) if err != nil { return "", err } + if _, err := s.db.Exec(ctx, ` + INSERT INTO webhook_template_versions (template_id, version, method, headers, body_template) + VALUES ($1, 1, $2, $3, $4) + `, id, method, headersJSON, req.BodyTemplate); err != nil { + return "", err + } return id, nil } diff --git a/internal/api/mail/service_iface.go b/internal/api/mail/service_iface.go index 271c9ad..762af30 100644 --- a/internal/api/mail/service_iface.go +++ b/internal/api/mail/service_iface.go @@ -6,6 +6,7 @@ import ( "time" "github.com/ultisuite/ulti-backend/internal/api/query" + "github.com/ultisuite/ulti-backend/internal/mail/rules" ) // ServiceAPI is the mail handler service boundary. *Service implements it in production. @@ -34,8 +35,11 @@ type ServiceAPI interface { CreateRule(ctx context.Context, userID string, req *createRuleRequest) (string, error) UpdateRule(ctx context.Context, externalID, ruleID string, req *updateRuleRequest) error DeleteRule(ctx context.Context, externalID, ruleID string) error + SimulateRule(ctx context.Context, externalID string, req *simulateRuleRequest) (rules.SimulationResult, error) ListWebhooks(ctx context.Context, externalID string, params query.ListParams) (WebhooksList, error) - CreateWebhook(ctx context.Context, externalID string, req *createWebhookRequest, method string) (string, error) + CreateWebhook(ctx context.Context, externalID string, req *createWebhookRequest, method string, maxRetries int) (string, error) + UpdateWebhook(ctx context.Context, externalID, webhookID string, req *updateWebhookRequest, method string, maxRetries int) error + PreviewWebhookTemplate(ctx context.Context, externalID string, req *previewWebhookRequest) (map[string]any, error) DeleteWebhook(ctx context.Context, externalID, webhookID string) error ListIdentities(ctx context.Context, externalID, accountID string, params query.ListParams) (IdentitiesList, error) GetIdentity(ctx context.Context, externalID, identityID string) (map[string]any, error) diff --git a/internal/api/mail/service_rules_simulate.go b/internal/api/mail/service_rules_simulate.go new file mode 100644 index 0000000..235607d --- /dev/null +++ b/internal/api/mail/service_rules_simulate.go @@ -0,0 +1,70 @@ +package mail + +import ( + "context" + "encoding/json" + "errors" + + "github.com/jackc/pgx/v5" + + "github.com/ultisuite/ulti-backend/internal/mail/rules" +) + +func (s *Service) SimulateRule(ctx context.Context, externalID string, req *simulateRuleRequest) (rules.SimulationResult, error) { + conditions, actions, err := s.resolveSimulateRule(ctx, externalID, req) + if err != nil { + return rules.SimulationResult{}, err + } + + msg := &rules.Message{ + ID: "simulation", + From: req.Message.From, + To: req.Message.To, + Subject: req.Message.Subject, + BodyText: req.Message.BodyText, + HasAttachments: req.Message.HasAttachments, + } + + engine := rules.NewEngine(s.db) + return engine.SimulateRule(ctx, conditions, actions, msg), nil +} + +func (s *Service) resolveSimulateRule(ctx context.Context, externalID string, req *simulateRuleRequest) ([]rules.Condition, []rules.Action, error) { + if req.RuleID != "" { + var condJSON, actJSON []byte + err := s.db.QueryRow(ctx, ` + SELECT conditions, actions + FROM mail_rules + WHERE id = $1 AND user_id = (SELECT id FROM users WHERE external_id = $2) + `, req.RuleID, externalID).Scan(&condJSON, &actJSON) + if err != nil { + if errors.Is(err, pgx.ErrNoRows) { + return nil, nil, ErrNotFound + } + return nil, nil, err + } + return unmarshalRuleConditionsActions(condJSON, actJSON) + } + + condJSON, err := json.Marshal(req.Rule.Conditions) + if err != nil { + return nil, nil, err + } + actJSON, err := json.Marshal(req.Rule.Actions) + if err != nil { + return nil, nil, err + } + return unmarshalRuleConditionsActions(condJSON, actJSON) +} + +func unmarshalRuleConditionsActions(condJSON, actJSON []byte) ([]rules.Condition, []rules.Action, error) { + var conditions []rules.Condition + var actions []rules.Action + if err := json.Unmarshal(condJSON, &conditions); err != nil { + return nil, nil, err + } + if err := json.Unmarshal(actJSON, &actions); err != nil { + return nil, nil, err + } + return conditions, actions, nil +} diff --git a/internal/api/mail/service_webhooks.go b/internal/api/mail/service_webhooks.go new file mode 100644 index 0000000..1f119f9 --- /dev/null +++ b/internal/api/mail/service_webhooks.go @@ -0,0 +1,70 @@ +package mail + +import ( + "context" + "encoding/json" + "errors" + + "github.com/jackc/pgx/v5" + "github.com/ultisuite/ulti-backend/internal/mail/webhooks" +) + +func (s *Service) UpdateWebhook(ctx context.Context, externalID, webhookID string, req *updateWebhookRequest, method string, maxRetries int) error { + headersJSON, _ := json.Marshal(req.Headers) + + tx, err := s.db.Begin(ctx) + if err != nil { + return err + } + defer tx.Rollback(ctx) + + var version int + err = tx.QueryRow(ctx, ` + UPDATE webhook_templates + SET + name = $1, + url = $2, + method = $3, + headers = $4, + body_template = $5, + signing_secret = $6, + max_retries = $7, + version = version + 1, + updated_at = NOW() + WHERE id = $8 + AND user_id = (SELECT id FROM users WHERE external_id = $9) + RETURNING version + `, req.Name, req.URL, method, headersJSON, req.BodyTemplate, req.SigningSecret, maxRetries, webhookID, externalID).Scan(&version) + if err != nil { + if errors.Is(err, pgx.ErrNoRows) { + return ErrNotFound + } + return err + } + + if _, err := tx.Exec(ctx, ` + INSERT INTO webhook_template_versions (template_id, version, method, headers, body_template) + VALUES ($1, $2, $3, $4, $5) + `, webhookID, version, method, headersJSON, req.BodyTemplate); err != nil { + return err + } + + return tx.Commit(ctx) +} + +func (s *Service) PreviewWebhookTemplate(_ context.Context, _ string, req *previewWebhookRequest) (map[string]any, error) { + msgCtx := &webhooks.MessageContext{ + SenderName: req.Message.SenderName, + SenderEmail: req.Message.SenderEmail, + Subject: req.Message.Subject, + BodyText: req.Message.BodyText, + BodyHTML: req.Message.BodyHTML, + Date: req.Message.Date, + Recipients: req.Message.Recipients, + HasAttachment: req.Message.HasAttachment, + MessageID: req.Message.MessageID, + } + return map[string]any{ + "payload": webhooks.RenderBodyTemplate(req.BodyTemplate, msgCtx), + }, nil +} diff --git a/internal/api/mail/validate.go b/internal/api/mail/validate.go index 1538c11..5bf57a4 100644 --- a/internal/api/mail/validate.go +++ b/internal/api/mail/validate.go @@ -26,6 +26,9 @@ const ( maxWebhookHeaders = 20 maxHeaderNameLen = 256 maxHeaderValueLen = 8192 + maxWebhookSecretLen = 512 + defaultWebhookRetries = 3 + maxWebhookRetries = 10 maxSubjectLen = 998 maxEmailLen = 320 @@ -349,6 +352,52 @@ type updateRuleRequest struct { Actions any `json:"actions"` } +type simulateRuleSampleMessage struct { + From string `json:"from"` + To []string `json:"to"` + Subject string `json:"subject"` + BodyText string `json:"body_text"` + HasAttachments bool `json:"has_attachments"` +} + +type simulateRuleInlineRule struct { + Conditions any `json:"conditions"` + Actions any `json:"actions"` +} + +type simulateRuleRequest struct { + Message *simulateRuleSampleMessage `json:"message"` + RuleID string `json:"rule_id"` + Rule *simulateRuleInlineRule `json:"rule"` +} + +func validateSimulateRule(req *simulateRuleRequest) *apivalidate.ValidationError { + var details []apivalidate.FieldDetail + if req.Message == nil { + details = append(details, apivalidate.FieldDetail{Field: "message", Message: "required"}) + } + hasRuleID := strings.TrimSpace(req.RuleID) != "" + hasInlineRule := req.Rule != nil + if hasRuleID && hasInlineRule { + details = append(details, apivalidate.FieldDetail{Field: "rule_id", Message: "provide rule_id or rule, not both"}) + } + if !hasRuleID && !hasInlineRule { + details = append(details, apivalidate.FieldDetail{Field: "rule_id", Message: "rule_id or rule required"}) + } + if hasInlineRule { + if req.Rule.Conditions == nil { + details = append(details, apivalidate.FieldDetail{Field: "rule.conditions", Message: "required"}) + } + if req.Rule.Actions == nil { + details = append(details, apivalidate.FieldDetail{Field: "rule.actions", Message: "required"}) + } + } + if len(details) == 0 { + return nil + } + return apivalidate.NewValidationError(details...) +} + func validateUpdateRule(req *updateRuleRequest) *apivalidate.ValidationError { var details []apivalidate.FieldDetail if strings.TrimSpace(req.Name) == "" { @@ -369,11 +418,40 @@ func validateUpdateRule(req *updateRuleRequest) *apivalidate.ValidationError { } type createWebhookRequest struct { - Name string `json:"name"` - URL string `json:"url"` - Method string `json:"method"` - Headers map[string]string `json:"headers"` - BodyTemplate string `json:"body_template"` + Name string `json:"name"` + URL string `json:"url"` + Method string `json:"method"` + Headers map[string]string `json:"headers"` + BodyTemplate string `json:"body_template"` + SigningSecret string `json:"signing_secret"` + MaxRetries *int `json:"max_retries"` +} + +type updateWebhookRequest struct { + Name string `json:"name"` + URL string `json:"url"` + Method string `json:"method"` + Headers map[string]string `json:"headers"` + BodyTemplate string `json:"body_template"` + SigningSecret string `json:"signing_secret"` + MaxRetries *int `json:"max_retries"` +} + +type previewWebhookMessageRequest struct { + SenderName string `json:"sender_name"` + SenderEmail string `json:"sender_email"` + Subject string `json:"subject"` + BodyText string `json:"body_text"` + BodyHTML string `json:"body_html"` + Date string `json:"date"` + Recipients string `json:"recipients"` + HasAttachment bool `json:"has_attachment"` + MessageID string `json:"message_id"` +} + +type previewWebhookRequest struct { + BodyTemplate string `json:"body_template"` + Message *previewWebhookMessageRequest `json:"message"` } func validateWebhookURL(raw string) *apivalidate.FieldDetail { @@ -482,7 +560,30 @@ func validateWebhookBodyTemplate(body string) *apivalidate.FieldDetail { return nil } -func validateCreateWebhook(req *createWebhookRequest) (string, *apivalidate.ValidationError) { +func validateWebhookSigningSecret(secret string) *apivalidate.FieldDetail { + if secret == "" { + return nil + } + if len(secret) > maxWebhookSecretLen { + return &apivalidate.FieldDetail{Field: "signing_secret", Message: "too long"} + } + if containsNewline(secret) { + return &apivalidate.FieldDetail{Field: "signing_secret", Message: "invalid"} + } + return nil +} + +func normalizeWebhookMaxRetries(v *int) (int, *apivalidate.FieldDetail) { + if v == nil { + return defaultWebhookRetries, nil + } + if *v < 0 || *v > maxWebhookRetries { + return 0, &apivalidate.FieldDetail{Field: "max_retries", Message: "must be between 0 and 10"} + } + return *v, nil +} + +func validateCreateWebhook(req *createWebhookRequest) (string, int, *apivalidate.ValidationError) { var details []apivalidate.FieldDetail if strings.TrimSpace(req.Name) == "" { details = append(details, apivalidate.FieldDetail{Field: "name", Message: "required"}) @@ -496,14 +597,49 @@ func validateCreateWebhook(req *createWebhookRequest) (string, *apivalidate.Vali if d != nil { details = append(details, *d) } + maxRetries, d := normalizeWebhookMaxRetries(req.MaxRetries) + if d != nil { + details = append(details, *d) + } + if d := validateWebhookSigningSecret(req.SigningSecret); d != nil { + details = append(details, *d) + } if len(details) > 0 { - return "", apivalidate.NewValidationError(details...) + return "", 0, apivalidate.NewValidationError(details...) } if verr := validateWebhookHeaders(req.Headers); verr != nil { - return "", verr + return "", 0, verr } if d := validateWebhookBodyTemplate(req.BodyTemplate); d != nil { - return "", apivalidate.NewValidationError(*d) + return "", 0, apivalidate.NewValidationError(*d) } - return method, nil + return method, maxRetries, nil +} + +func validateUpdateWebhook(req *updateWebhookRequest) (string, int, *apivalidate.ValidationError) { + return validateCreateWebhook(&createWebhookRequest{ + Name: req.Name, + URL: req.URL, + Method: req.Method, + Headers: req.Headers, + BodyTemplate: req.BodyTemplate, + SigningSecret: req.SigningSecret, + MaxRetries: req.MaxRetries, + }) +} + +func validatePreviewWebhook(req *previewWebhookRequest) *apivalidate.ValidationError { + var details []apivalidate.FieldDetail + if req.Message == nil { + details = append(details, apivalidate.FieldDetail{Field: "message", Message: "required"}) + } + if strings.TrimSpace(req.BodyTemplate) == "" { + details = append(details, apivalidate.FieldDetail{Field: "body_template", Message: "required"}) + } else if d := validateWebhookBodyTemplate(req.BodyTemplate); d != nil { + details = append(details, *d) + } + if len(details) == 0 { + return nil + } + return apivalidate.NewValidationError(details...) } diff --git a/internal/mail/rules/simulate.go b/internal/mail/rules/simulate.go new file mode 100644 index 0000000..9c40c61 --- /dev/null +++ b/internal/mail/rules/simulate.go @@ -0,0 +1,71 @@ +package rules + +import ( + "context" + "fmt" + + "github.com/ultisuite/ulti-backend/internal/mail/webhooks" +) + +type SimulatedActionResult struct { + ActionResult + SimulatedPayload string `json:"simulated_payload,omitempty"` +} + +type SimulationResult struct { + Matched bool `json:"matched"` + Actions []SimulatedActionResult `json:"actions,omitempty"` +} + +func (e *Engine) SimulateRule(ctx context.Context, conditions []Condition, actions []Action, msg *Message) SimulationResult { + if !matchesAll(conditions, msg) { + return SimulationResult{Matched: false} + } + return SimulationResult{ + Matched: true, + Actions: e.simulateActions(ctx, actions, msg), + } +} + +func (e *Engine) simulateActions(ctx context.Context, actions []Action, msg *Message) []SimulatedActionResult { + results := make([]SimulatedActionResult, 0, len(actions)) + for _, action := range actions { + results = append(results, e.simulateAction(ctx, action, msg)) + } + return results +} + +func (e *Engine) simulateAction(ctx context.Context, action Action, msg *Message) SimulatedActionResult { + switch action.Type { + case "label", "move", "archive", "delete", "mark_read": + return SimulatedActionResult{ + ActionResult: ActionResult{Type: action.Type, Value: action.Value, OK: true}, + } + case "webhook": + if e.db == nil { + return SimulatedActionResult{ + ActionResult: actionResultFrom(action, fmt.Errorf("webhook simulation unavailable")), + } + } + var bodyTemplate string + err := e.db.QueryRow(ctx, ` + SELECT body_template + FROM webhook_templates + WHERE id = $1 AND is_active = true + `, action.Value).Scan(&bodyTemplate) + if err != nil { + return SimulatedActionResult{ + ActionResult: actionResultFrom(action, fmt.Errorf("query template: %w", err)), + } + } + payload := webhooks.RenderBodyTemplate(bodyTemplate, messageToWebhookContext(msg)) + return SimulatedActionResult{ + ActionResult: ActionResult{Type: action.Type, Value: action.Value, OK: true}, + SimulatedPayload: payload, + } + default: + return SimulatedActionResult{ + ActionResult: actionResultFrom(action, fmt.Errorf("unknown action type: %s", action.Type)), + } + } +} diff --git a/internal/mail/webhooks/executor.go b/internal/mail/webhooks/executor.go index 63970e4..eaccef0 100644 --- a/internal/mail/webhooks/executor.go +++ b/internal/mail/webhooks/executor.go @@ -3,11 +3,15 @@ package webhooks import ( "bytes" "context" + "crypto/hmac" + "crypto/sha256" + "encoding/hex" "encoding/json" "fmt" "io" "log/slog" "net/http" + "strconv" "strings" "time" @@ -43,76 +47,223 @@ type MessageContext struct { MessageID string `json:"message_id"` } -func (e *Executor) Execute(ctx context.Context, templateID string, msgCtx *MessageContext) error { - var ( - url string - method string - headersJSON []byte - bodyTemplate string - ) +const ( + payloadPreviewLimit = 2048 + maxResponseBodySize = 4096 + maxWebhookRetries = 10 +) - err := e.db.QueryRow(ctx, ` - SELECT url, method, headers, body_template - FROM webhook_templates - WHERE id = $1 AND is_active = true - `, templateID).Scan(&url, &method, &headersJSON, &bodyTemplate) +type templateConfig struct { + url string + method string + headersJSON []byte + bodyTemplate string + signingSecret string + maxRetries int +} + +func (e *Executor) Execute(ctx context.Context, templateID string, msgCtx *MessageContext) error { + cfg, err := e.loadTemplateConfig(ctx, templateID) if err != nil { - return fmt.Errorf("query template: %w", err) + return err + } + if cfg.maxRetries > maxWebhookRetries { + cfg.maxRetries = maxWebhookRetries } - body := interpolate(bodyTemplate, msgCtx) - - start := time.Now() - req, err := http.NewRequestWithContext(ctx, method, url, bytes.NewBufferString(body)) - if err != nil { - return fmt.Errorf("create request: %w", err) + body := interpolate(cfg.bodyTemplate, msgCtx) + payloadPreview, payloadTruncated := truncateForLog(body, payloadPreviewLimit) + if payloadTruncated { + observability.IncWebhookPayloadTruncated() } var headers map[string]string - json.Unmarshal(headersJSON, &headers) + if err := json.Unmarshal(cfg.headersJSON, &headers); err != nil { + return fmt.Errorf("parse template headers: %w", err) + } + totalStart := time.Now() + + attempts := cfg.maxRetries + 1 + var ( + lastStatusCode int + lastError error + lastRespBody string + ) + + for attempt := 1; attempt <= attempts; attempt++ { + reqStart := time.Now() + statusCode, responseBody, reqErr := e.executeAttempt(ctx, cfg, headers, body) + durationMS := time.Since(reqStart).Milliseconds() + e.logAttempt(ctx, templateID, msgCtx.MessageID, attempt, statusCode, responseBody, reqErr, durationMS, payloadPreview, payloadTruncated) + + lastStatusCode = statusCode + lastRespBody = responseBody + lastError = reqErr + + if reqErr == nil && statusCode < http.StatusBadRequest { + observability.ObserveWebhookExecution("success", statusCode, time.Since(totalStart)) + return nil + } + if attempt >= attempts || !shouldRetry(reqErr, statusCode) { + break + } + + observability.IncWebhookRetry(retryReason(reqErr, statusCode)) + if err := waitWithContext(ctx, webhookRetryDelay(attempt)); err != nil { + lastError = err + break + } + } + + observability.ObserveWebhookExecution("error", lastStatusCode, time.Since(totalStart)) + e.recordDeadLetter(ctx, templateID, msgCtx.MessageID, attempts, lastStatusCode, lastError, payloadPreview) + if lastError != nil { + return fmt.Errorf("webhook failed after retries: %w", lastError) + } + return fmt.Errorf("webhook returned %d after retries, response=%s", lastStatusCode, lastRespBody) +} + +func (e *Executor) loadTemplateConfig(ctx context.Context, templateID string) (*templateConfig, error) { + cfg := &templateConfig{} + err := e.db.QueryRow(ctx, ` + SELECT url, method, headers, body_template, signing_secret, max_retries + FROM webhook_templates + WHERE id = $1 AND is_active = true + `, templateID).Scan(&cfg.url, &cfg.method, &cfg.headersJSON, &cfg.bodyTemplate, &cfg.signingSecret, &cfg.maxRetries) + if err != nil { + return nil, fmt.Errorf("query template: %w", err) + } + if cfg.maxRetries < 0 { + cfg.maxRetries = 0 + } + return cfg, nil +} + +func (e *Executor) executeAttempt(ctx context.Context, cfg *templateConfig, headers map[string]string, body string) (statusCode int, responseBody string, err error) { + req, err := http.NewRequestWithContext(ctx, cfg.method, cfg.url, bytes.NewBufferString(body)) + if err != nil { + return 0, "", fmt.Errorf("create request: %w", err) + } + for k, v := range headers { req.Header.Set(k, v) } if req.Header.Get("Content-Type") == "" { req.Header.Set("Content-Type", "application/json") } + if cfg.signingSecret != "" { + timestamp := strconv.FormatInt(time.Now().Unix(), 10) + req.Header.Set("X-Ultimail-Signature-Timestamp", timestamp) + req.Header.Set("X-Ultimail-Signature", signPayload(cfg.signingSecret, timestamp, body)) + } resp, err := e.client.Do(req) - requestDuration := time.Since(start) - durationMS := requestDuration.Milliseconds() - - var statusCode int - var responseBody string - var execError string - if err != nil { - execError = err.Error() - } else { - statusCode = resp.StatusCode - respBytes, _ := io.ReadAll(io.LimitReader(resp.Body, 4096)) - resp.Body.Close() - responseBody = string(respBytes) + return 0, "", fmt.Errorf("request failed: %w", err) } + defer resp.Body.Close() - _, logErr := e.db.Exec(ctx, ` - INSERT INTO webhook_logs (template_id, message_id, status_code, response_body, error, duration_ms) + respBytes, _ := io.ReadAll(io.LimitReader(resp.Body, maxResponseBodySize)) + return resp.StatusCode, string(respBytes), nil +} + +func (e *Executor) logAttempt( + ctx context.Context, + templateID, messageID string, + attempt, statusCode int, + responseBody string, + execErr error, + durationMS int64, + payloadPreview string, + payloadTruncated bool, +) { + errMsg := "" + if execErr != nil { + errMsg = execErr.Error() + } + if _, err := e.db.Exec(ctx, ` + INSERT INTO webhook_logs ( + template_id, message_id, attempt_count, status_code, response_body, error, duration_ms, payload_preview, payload_truncated + ) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) + `, templateID, messageID, attempt, statusCode, responseBody, errMsg, durationMS, payloadPreview, payloadTruncated); err != nil { + e.logger.Error("failed to log webhook", "error", err) + } +} + +func (e *Executor) recordDeadLetter(ctx context.Context, templateID, messageID string, attempts, statusCode int, execErr error, payloadPreview string) { + errMsg := "" + if execErr != nil { + errMsg = execErr.Error() + } else if statusCode >= http.StatusBadRequest { + errMsg = fmt.Sprintf("webhook returned %d", statusCode) + } + if _, err := e.db.Exec(ctx, ` + INSERT INTO webhook_dead_letters (template_id, message_id, attempt_count, last_status_code, error, payload_preview) VALUES ($1, $2, $3, $4, $5, $6) - `, templateID, msgCtx.MessageID, statusCode, responseBody, execError, durationMS) - if logErr != nil { - e.logger.Error("failed to log webhook", "error", logErr) + `, templateID, messageID, attempts, statusCode, errMsg, payloadPreview); err != nil { + e.logger.Error("failed to write webhook dead-letter", "error", err) + return } + observability.IncWebhookDeadLetter() +} - if err != nil { - observability.ObserveWebhookExecution("error", statusCode, requestDuration) - return fmt.Errorf("request failed: %w", err) +func shouldRetry(execErr error, statusCode int) bool { + if execErr != nil { + return true } - if statusCode >= 400 { - observability.ObserveWebhookExecution("error", statusCode, requestDuration) - return fmt.Errorf("webhook returned %d", statusCode) - } - observability.ObserveWebhookExecution("success", statusCode, requestDuration) + return statusCode >= http.StatusInternalServerError || statusCode == http.StatusTooManyRequests +} - return nil +func retryReason(execErr error, statusCode int) string { + if execErr != nil { + return "network" + } + return strconv.Itoa(statusCode/100) + "xx" +} + +func webhookRetryDelay(attempt int) time.Duration { + delay := 500 * time.Millisecond + for i := 1; i < attempt; i++ { + if delay >= 8*time.Second { + return 8 * time.Second + } + delay *= 2 + } + if delay > 8*time.Second { + return 8 * time.Second + } + return delay +} + +func waitWithContext(ctx context.Context, d time.Duration) error { + timer := time.NewTimer(d) + defer timer.Stop() + select { + case <-timer.C: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +func signPayload(secret, timestamp, payload string) string { + mac := hmac.New(sha256.New, []byte(secret)) + mac.Write([]byte(timestamp)) + mac.Write([]byte(".")) + mac.Write([]byte(payload)) + return "sha256=" + hex.EncodeToString(mac.Sum(nil)) +} + +func truncateForLog(payload string, maxLen int) (string, bool) { + if len(payload) <= maxLen { + return payload, false + } + return payload[:maxLen], true +} + +func RenderBodyTemplate(template string, ctx *MessageContext) string { + return interpolate(template, ctx) } func interpolate(template string, ctx *MessageContext) string { diff --git a/internal/mail/webhooks/executor_retry_test.go b/internal/mail/webhooks/executor_retry_test.go new file mode 100644 index 0000000..a767617 --- /dev/null +++ b/internal/mail/webhooks/executor_retry_test.go @@ -0,0 +1,46 @@ +package webhooks + +import ( + "strings" + "testing" + "time" +) + +func TestWebhookRetryDelay(t *testing.T) { + tests := []struct { + attempt int + want time.Duration + }{ + {attempt: 1, want: 500 * time.Millisecond}, + {attempt: 2, want: time.Second}, + {attempt: 3, want: 2 * time.Second}, + {attempt: 6, want: 8 * time.Second}, + {attempt: 9, want: 8 * time.Second}, + } + + for _, tt := range tests { + if got := webhookRetryDelay(tt.attempt); got != tt.want { + t.Fatalf("webhookRetryDelay(%d) = %v, want %v", tt.attempt, got, tt.want) + } + } +} + +func TestSignPayload(t *testing.T) { + got := signPayload("secret", "1716372000", `{"ok":true}`) + if !strings.HasPrefix(got, "sha256=") { + t.Fatalf("signature prefix missing: %q", got) + } + if len(got) != len("sha256=")+64 { + t.Fatalf("signature length = %d, want %d", len(got), len("sha256=")+64) + } +} + +func TestTruncateForLog(t *testing.T) { + preview, truncated := truncateForLog("abcdef", 4) + if !truncated { + t.Fatal("truncated = false, want true") + } + if preview != "abcd" { + t.Fatalf("preview = %q, want %q", preview, "abcd") + } +} diff --git a/internal/observability/metrics.go b/internal/observability/metrics.go index f523d18..dcce7ba 100644 --- a/internal/observability/metrics.go +++ b/internal/observability/metrics.go @@ -62,6 +62,21 @@ var ( Help: "Webhook execution latency in seconds.", Buckets: []float64{0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2, 5, 10}, }, []string{"outcome"}) + + webhookRetriesTotal = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "ultid_webhook_retries_total", + Help: "Total number of webhook retries.", + }, []string{"reason"}) + + webhookDeadLettersTotal = promauto.NewCounter(prometheus.CounterOpts{ + Name: "ultid_webhook_dead_letters_total", + Help: "Total number of webhook executions moved to dead-letter.", + }) + + webhookPayloadTruncatedTotal = promauto.NewCounter(prometheus.CounterOpts{ + Name: "ultid_webhook_payload_truncated_total", + Help: "Total number of webhook payloads truncated in logs.", + }) ) type metricsResponseWriter struct { @@ -115,3 +130,15 @@ func ObserveWebhookExecution(outcome string, statusCode int, duration time.Durat webhookExecutionsTotal.WithLabelValues(outcome, statusClass).Inc() webhookDurationSeconds.WithLabelValues(outcome).Observe(duration.Seconds()) } + +func IncWebhookRetry(reason string) { + webhookRetriesTotal.WithLabelValues(reason).Inc() +} + +func IncWebhookDeadLetter() { + webhookDeadLettersTotal.Inc() +} + +func IncWebhookPayloadTruncated() { + webhookPayloadTruncatedTotal.Inc() +} diff --git a/migrations/000011_webhook_reliability.down.sql b/migrations/000011_webhook_reliability.down.sql new file mode 100644 index 0000000..51a7371 --- /dev/null +++ b/migrations/000011_webhook_reliability.down.sql @@ -0,0 +1,26 @@ +DROP INDEX IF EXISTS idx_webhook_dead_letters_template; + +DROP TABLE IF EXISTS webhook_dead_letters; + +ALTER TABLE webhook_logs + DROP COLUMN IF EXISTS payload_truncated; + +ALTER TABLE webhook_logs + DROP COLUMN IF EXISTS payload_preview; + +ALTER TABLE webhook_logs + DROP COLUMN IF EXISTS attempt_count; + +DROP TABLE IF EXISTS webhook_template_versions; + +ALTER TABLE webhook_templates + DROP CONSTRAINT IF EXISTS webhook_templates_max_retries_chk; + +ALTER TABLE webhook_templates + DROP COLUMN IF EXISTS max_retries; + +ALTER TABLE webhook_templates + DROP COLUMN IF EXISTS signing_secret; + +ALTER TABLE webhook_templates + DROP COLUMN IF EXISTS version; diff --git a/migrations/000011_webhook_reliability.up.sql b/migrations/000011_webhook_reliability.up.sql new file mode 100644 index 0000000..fdeafe3 --- /dev/null +++ b/migrations/000011_webhook_reliability.up.sql @@ -0,0 +1,59 @@ +ALTER TABLE webhook_templates + ADD COLUMN IF NOT EXISTS version INT NOT NULL DEFAULT 1; + +ALTER TABLE webhook_templates + ADD COLUMN IF NOT EXISTS signing_secret TEXT NOT NULL DEFAULT ''; + +ALTER TABLE webhook_templates + ADD COLUMN IF NOT EXISTS max_retries INT NOT NULL DEFAULT 3; + +ALTER TABLE webhook_templates + DROP CONSTRAINT IF EXISTS webhook_templates_max_retries_chk; + +ALTER TABLE webhook_templates + ADD CONSTRAINT webhook_templates_max_retries_chk + CHECK (max_retries >= 0 AND max_retries <= 10) NOT VALID; + +CREATE TABLE IF NOT EXISTS webhook_template_versions ( + id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), + template_id UUID NOT NULL REFERENCES webhook_templates(id) ON DELETE CASCADE, + version INT NOT NULL, + method TEXT NOT NULL DEFAULT 'POST', + headers JSONB NOT NULL DEFAULT '{}', + body_template TEXT NOT NULL DEFAULT '', + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + UNIQUE(template_id, version) +); + +INSERT INTO webhook_template_versions (template_id, version, method, headers, body_template) +SELECT wt.id, wt.version, wt.method, wt.headers, wt.body_template +FROM webhook_templates wt +WHERE NOT EXISTS ( + SELECT 1 + FROM webhook_template_versions wtv + WHERE wtv.template_id = wt.id + AND wtv.version = wt.version +); + +ALTER TABLE webhook_logs + ADD COLUMN IF NOT EXISTS attempt_count INT NOT NULL DEFAULT 1; + +ALTER TABLE webhook_logs + ADD COLUMN IF NOT EXISTS payload_preview TEXT NOT NULL DEFAULT ''; + +ALTER TABLE webhook_logs + ADD COLUMN IF NOT EXISTS payload_truncated BOOLEAN NOT NULL DEFAULT false; + +CREATE TABLE IF NOT EXISTS webhook_dead_letters ( + id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), + template_id UUID NOT NULL REFERENCES webhook_templates(id) ON DELETE CASCADE, + message_id UUID REFERENCES messages(id) ON DELETE SET NULL, + attempt_count INT NOT NULL DEFAULT 0, + last_status_code INT NOT NULL DEFAULT 0, + error TEXT NOT NULL DEFAULT '', + payload_preview TEXT NOT NULL DEFAULT '', + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS idx_webhook_dead_letters_template + ON webhook_dead_letters(template_id, created_at DESC); diff --git a/project-plan/checklist-execution.md b/project-plan/checklist-execution.md index e6652ff..bf1695e 100644 --- a/project-plan/checklist-execution.md +++ b/project-plan/checklist-execution.md @@ -113,10 +113,10 @@ Objectif: transformer état actuel (partiellement implémenté) vers produit fon - [x] Câbler réellement `rules.Engine` dans pipeline réception. - [x] Câbler réellement `webhooks.Executor` depuis actions de règles. -- [ ] Ajouter simulation/test endpoint "run rule on sample message". -- [ ] Ajouter templates webhook versionnés + preview rendu variables. -- [ ] Ajouter signatures webhook (HMAC) + retry + backoff + DLQ. -- [ ] Ajouter observabilité des exécutions (latence, erreur, payload tronqué). +- [x] Ajouter simulation/test endpoint "run rule on sample message". +- [x] Ajouter templates webhook versionnés + preview rendu variables. +- [x] Ajouter signatures webhook (HMAC) + retry + backoff + DLQ. +- [x] Ajouter observabilité des exécutions (latence, erreur, payload tronqué). ### 2.5 Realtime (`/ws`)