ultisuite-backend/internal/mail/webhooks/executor.go
R3D347HR4Y 082cac36b2 feat(automation): dispatch rules/webhooks on mail, drive, contacts
Wire automation dispatcher to IMAP sync, drive mutations, and contact CRUD.
Add webhook event_types and mail/drive/contacts scope filters (migration 30).
2026-06-07 15:51:47 +02:00

306 lines
8.8 KiB
Go

package webhooks
import (
"bytes"
"context"
"crypto/hmac"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"log/slog"
"net/http"
"strconv"
"strings"
"time"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/ultisuite/ulti-backend/internal/observability"
)
type Executor struct {
db *pgxpool.Pool
client *http.Client
logger *slog.Logger
}
func NewExecutor(db *pgxpool.Pool) *Executor {
return &Executor{
db: db,
client: &http.Client{
Timeout: 10 * time.Second,
},
logger: slog.Default().With("component", "webhooks"),
}
}
type MessageContext struct {
SenderName string `json:"sender_name"`
SenderEmail string `json:"sender_email"`
Subject string `json:"subject"`
BodyText string `json:"body_text"`
BodyHTML string `json:"body_html"`
Date string `json:"date"`
Recipients string `json:"recipients"`
HasAttachment bool `json:"has_attachment"`
MessageID string `json:"message_id"`
EventType string `json:"event_type,omitempty"`
EventDomain string `json:"event_domain,omitempty"`
DriveFileName string `json:"drive_file_name,omitempty"`
DriveFilePath string `json:"drive_file_path,omitempty"`
DriveMimeType string `json:"drive_mime_type,omitempty"`
DriveFileSize string `json:"drive_file_size,omitempty"`
DriveIsFolder string `json:"drive_is_folder,omitempty"`
ContactID string `json:"contact_id,omitempty"`
ContactName string `json:"contact_name,omitempty"`
ContactEmail string `json:"contact_email,omitempty"`
ContactPhone string `json:"contact_phone,omitempty"`
ContactOrg string `json:"contact_org,omitempty"`
}
const (
payloadPreviewLimit = 2048
maxResponseBodySize = 4096
maxWebhookRetries = 10
)
type templateConfig struct {
url string
method string
headersJSON []byte
bodyTemplate string
signingSecret string
maxRetries int
}
func (e *Executor) Execute(ctx context.Context, templateID string, msgCtx *MessageContext) error {
cfg, err := e.loadTemplateConfig(ctx, templateID)
if err != nil {
return err
}
if cfg.maxRetries > maxWebhookRetries {
cfg.maxRetries = maxWebhookRetries
}
body := interpolate(cfg.bodyTemplate, msgCtx)
payloadPreview, payloadTruncated := truncateForLog(body, payloadPreviewLimit)
if payloadTruncated {
observability.IncWebhookPayloadTruncated()
}
var headers map[string]string
if err := json.Unmarshal(cfg.headersJSON, &headers); err != nil {
return fmt.Errorf("parse template headers: %w", err)
}
totalStart := time.Now()
attempts := cfg.maxRetries + 1
var (
lastStatusCode int
lastError error
lastRespBody string
)
for attempt := 1; attempt <= attempts; attempt++ {
reqStart := time.Now()
statusCode, responseBody, reqErr := e.executeAttempt(ctx, cfg, headers, body)
durationMS := time.Since(reqStart).Milliseconds()
e.logAttempt(ctx, templateID, msgCtx.MessageID, attempt, statusCode, responseBody, reqErr, durationMS, payloadPreview, payloadTruncated)
lastStatusCode = statusCode
lastRespBody = responseBody
lastError = reqErr
if reqErr == nil && statusCode < http.StatusBadRequest {
observability.ObserveWebhookExecution("success", statusCode, time.Since(totalStart))
return nil
}
if attempt >= attempts || !shouldRetry(reqErr, statusCode) {
break
}
observability.IncWebhookRetry(retryReason(reqErr, statusCode))
if err := waitWithContext(ctx, webhookRetryDelay(attempt)); err != nil {
lastError = err
break
}
}
observability.ObserveWebhookExecution("error", lastStatusCode, time.Since(totalStart))
e.recordDeadLetter(ctx, templateID, msgCtx.MessageID, attempts, lastStatusCode, lastError, payloadPreview)
if lastError != nil {
return fmt.Errorf("webhook failed after retries: %w", lastError)
}
return fmt.Errorf("webhook returned %d after retries, response=%s", lastStatusCode, lastRespBody)
}
func (e *Executor) loadTemplateConfig(ctx context.Context, templateID string) (*templateConfig, error) {
cfg := &templateConfig{}
err := e.db.QueryRow(ctx, `
SELECT url, method, headers, body_template, signing_secret, max_retries
FROM webhook_templates
WHERE id = $1 AND is_active = true
`, templateID).Scan(&cfg.url, &cfg.method, &cfg.headersJSON, &cfg.bodyTemplate, &cfg.signingSecret, &cfg.maxRetries)
if err != nil {
return nil, fmt.Errorf("query template: %w", err)
}
if cfg.maxRetries < 0 {
cfg.maxRetries = 0
}
return cfg, nil
}
func (e *Executor) executeAttempt(ctx context.Context, cfg *templateConfig, headers map[string]string, body string) (statusCode int, responseBody string, err error) {
req, err := http.NewRequestWithContext(ctx, cfg.method, cfg.url, bytes.NewBufferString(body))
if err != nil {
return 0, "", fmt.Errorf("create request: %w", err)
}
for k, v := range headers {
req.Header.Set(k, v)
}
if req.Header.Get("Content-Type") == "" {
req.Header.Set("Content-Type", "application/json")
}
if cfg.signingSecret != "" {
timestamp := strconv.FormatInt(time.Now().Unix(), 10)
req.Header.Set("X-Ultimail-Signature-Timestamp", timestamp)
req.Header.Set("X-Ultimail-Signature", signPayload(cfg.signingSecret, timestamp, body))
}
resp, err := e.client.Do(req)
if err != nil {
return 0, "", fmt.Errorf("request failed: %w", err)
}
defer resp.Body.Close()
respBytes, _ := io.ReadAll(io.LimitReader(resp.Body, maxResponseBodySize))
return resp.StatusCode, string(respBytes), nil
}
func (e *Executor) logAttempt(
ctx context.Context,
templateID, messageID string,
attempt, statusCode int,
responseBody string,
execErr error,
durationMS int64,
payloadPreview string,
payloadTruncated bool,
) {
errMsg := ""
if execErr != nil {
errMsg = execErr.Error()
}
if _, err := e.db.Exec(ctx, `
INSERT INTO webhook_logs (
template_id, message_id, attempt_count, status_code, response_body, error, duration_ms, payload_preview, payload_truncated
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
`, templateID, messageID, attempt, statusCode, responseBody, errMsg, durationMS, payloadPreview, payloadTruncated); err != nil {
e.logger.Error("failed to log webhook", "error", err)
}
}
func (e *Executor) recordDeadLetter(ctx context.Context, templateID, messageID string, attempts, statusCode int, execErr error, payloadPreview string) {
errMsg := ""
if execErr != nil {
errMsg = execErr.Error()
} else if statusCode >= http.StatusBadRequest {
errMsg = fmt.Sprintf("webhook returned %d", statusCode)
}
if _, err := e.db.Exec(ctx, `
INSERT INTO webhook_dead_letters (template_id, message_id, attempt_count, last_status_code, error, payload_preview)
VALUES ($1, $2, $3, $4, $5, $6)
`, templateID, messageID, attempts, statusCode, errMsg, payloadPreview); err != nil {
e.logger.Error("failed to write webhook dead-letter", "error", err)
return
}
observability.IncWebhookDeadLetter()
}
func shouldRetry(execErr error, statusCode int) bool {
if execErr != nil {
return true
}
return statusCode >= http.StatusInternalServerError || statusCode == http.StatusTooManyRequests
}
func retryReason(execErr error, statusCode int) string {
if execErr != nil {
return "network"
}
return strconv.Itoa(statusCode/100) + "xx"
}
func webhookRetryDelay(attempt int) time.Duration {
delay := 500 * time.Millisecond
for i := 1; i < attempt; i++ {
if delay >= 8*time.Second {
return 8 * time.Second
}
delay *= 2
}
if delay > 8*time.Second {
return 8 * time.Second
}
return delay
}
func waitWithContext(ctx context.Context, d time.Duration) error {
timer := time.NewTimer(d)
defer timer.Stop()
select {
case <-timer.C:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func signPayload(secret, timestamp, payload string) string {
mac := hmac.New(sha256.New, []byte(secret))
mac.Write([]byte(timestamp))
mac.Write([]byte("."))
mac.Write([]byte(payload))
return "sha256=" + hex.EncodeToString(mac.Sum(nil))
}
func truncateForLog(payload string, maxLen int) (string, bool) {
if len(payload) <= maxLen {
return payload, false
}
return payload[:maxLen], true
}
func RenderBodyTemplate(template string, ctx *MessageContext) string {
return interpolate(template, ctx)
}
func interpolate(template string, ctx *MessageContext) string {
r := strings.NewReplacer(
"$sender.name", ctx.SenderName,
"$sender.email", ctx.SenderEmail,
"$subject", ctx.Subject,
"$body.textContent", ctx.BodyText,
"$body.htmlContent", ctx.BodyHTML,
"$date", ctx.Date,
"$recipients.to", ctx.Recipients,
"$message_id", ctx.MessageID,
"$event.type", ctx.EventType,
"$event.domain", ctx.EventDomain,
"$drive.file_name", ctx.DriveFileName,
"$drive.file_path", ctx.DriveFilePath,
"$drive.mime_type", ctx.DriveMimeType,
"$drive.file_size", ctx.DriveFileSize,
"$drive.is_folder", ctx.DriveIsFolder,
"$contact.id", ctx.ContactID,
"$contact.name", ctx.ContactName,
"$contact.email", ctx.ContactEmail,
"$contact.phone", ctx.ContactPhone,
"$contact.org", ctx.ContactOrg,
)
return r.Replace(template)
}