package webhooks import ( "bytes" "context" "crypto/hmac" "crypto/sha256" "encoding/hex" "encoding/json" "fmt" "io" "log/slog" "net/http" "strconv" "strings" "time" "github.com/jackc/pgx/v5/pgxpool" "github.com/ultisuite/ulti-backend/internal/observability" ) type Executor struct { db *pgxpool.Pool client *http.Client logger *slog.Logger } func NewExecutor(db *pgxpool.Pool) *Executor { return &Executor{ db: db, client: &http.Client{ Timeout: 10 * time.Second, }, logger: slog.Default().With("component", "webhooks"), } } type MessageContext struct { SenderName string `json:"sender_name"` SenderEmail string `json:"sender_email"` Subject string `json:"subject"` BodyText string `json:"body_text"` BodyHTML string `json:"body_html"` Date string `json:"date"` Recipients string `json:"recipients"` HasAttachment bool `json:"has_attachment"` MessageID string `json:"message_id"` EventType string `json:"event_type,omitempty"` EventDomain string `json:"event_domain,omitempty"` DriveFileName string `json:"drive_file_name,omitempty"` DriveFilePath string `json:"drive_file_path,omitempty"` DriveMimeType string `json:"drive_mime_type,omitempty"` DriveFileSize string `json:"drive_file_size,omitempty"` DriveIsFolder string `json:"drive_is_folder,omitempty"` ContactID string `json:"contact_id,omitempty"` ContactName string `json:"contact_name,omitempty"` ContactEmail string `json:"contact_email,omitempty"` ContactPhone string `json:"contact_phone,omitempty"` ContactOrg string `json:"contact_org,omitempty"` } const ( payloadPreviewLimit = 2048 maxResponseBodySize = 4096 maxWebhookRetries = 10 ) type templateConfig struct { url string method string headersJSON []byte bodyTemplate string signingSecret string maxRetries int } func (e *Executor) Execute(ctx context.Context, templateID string, msgCtx *MessageContext) error { cfg, err := e.loadTemplateConfig(ctx, templateID) if err != nil { return err } if cfg.maxRetries > maxWebhookRetries { cfg.maxRetries = maxWebhookRetries } body := interpolate(cfg.bodyTemplate, msgCtx) payloadPreview, payloadTruncated := truncateForLog(body, payloadPreviewLimit) if payloadTruncated { observability.IncWebhookPayloadTruncated() } var headers map[string]string if err := json.Unmarshal(cfg.headersJSON, &headers); err != nil { return fmt.Errorf("parse template headers: %w", err) } totalStart := time.Now() attempts := cfg.maxRetries + 1 var ( lastStatusCode int lastError error lastRespBody string ) for attempt := 1; attempt <= attempts; attempt++ { reqStart := time.Now() statusCode, responseBody, reqErr := e.executeAttempt(ctx, cfg, headers, body) durationMS := time.Since(reqStart).Milliseconds() e.logAttempt(ctx, templateID, msgCtx.MessageID, attempt, statusCode, responseBody, reqErr, durationMS, payloadPreview, payloadTruncated) lastStatusCode = statusCode lastRespBody = responseBody lastError = reqErr if reqErr == nil && statusCode < http.StatusBadRequest { observability.ObserveWebhookExecution("success", statusCode, time.Since(totalStart)) return nil } if attempt >= attempts || !shouldRetry(reqErr, statusCode) { break } observability.IncWebhookRetry(retryReason(reqErr, statusCode)) if err := waitWithContext(ctx, webhookRetryDelay(attempt)); err != nil { lastError = err break } } observability.ObserveWebhookExecution("error", lastStatusCode, time.Since(totalStart)) e.recordDeadLetter(ctx, templateID, msgCtx.MessageID, attempts, lastStatusCode, lastError, payloadPreview) if lastError != nil { return fmt.Errorf("webhook failed after retries: %w", lastError) } return fmt.Errorf("webhook returned %d after retries, response=%s", lastStatusCode, lastRespBody) } func (e *Executor) loadTemplateConfig(ctx context.Context, templateID string) (*templateConfig, error) { cfg := &templateConfig{} err := e.db.QueryRow(ctx, ` SELECT url, method, headers, body_template, signing_secret, max_retries FROM webhook_templates WHERE id = $1 AND is_active = true `, templateID).Scan(&cfg.url, &cfg.method, &cfg.headersJSON, &cfg.bodyTemplate, &cfg.signingSecret, &cfg.maxRetries) if err != nil { return nil, fmt.Errorf("query template: %w", err) } if cfg.maxRetries < 0 { cfg.maxRetries = 0 } return cfg, nil } func (e *Executor) executeAttempt(ctx context.Context, cfg *templateConfig, headers map[string]string, body string) (statusCode int, responseBody string, err error) { req, err := http.NewRequestWithContext(ctx, cfg.method, cfg.url, bytes.NewBufferString(body)) if err != nil { return 0, "", fmt.Errorf("create request: %w", err) } for k, v := range headers { req.Header.Set(k, v) } if req.Header.Get("Content-Type") == "" { req.Header.Set("Content-Type", "application/json") } if cfg.signingSecret != "" { timestamp := strconv.FormatInt(time.Now().Unix(), 10) req.Header.Set("X-Ultimail-Signature-Timestamp", timestamp) req.Header.Set("X-Ultimail-Signature", signPayload(cfg.signingSecret, timestamp, body)) } resp, err := e.client.Do(req) if err != nil { return 0, "", fmt.Errorf("request failed: %w", err) } defer resp.Body.Close() respBytes, _ := io.ReadAll(io.LimitReader(resp.Body, maxResponseBodySize)) return resp.StatusCode, string(respBytes), nil } func (e *Executor) logAttempt( ctx context.Context, templateID, messageID string, attempt, statusCode int, responseBody string, execErr error, durationMS int64, payloadPreview string, payloadTruncated bool, ) { errMsg := "" if execErr != nil { errMsg = execErr.Error() } if _, err := e.db.Exec(ctx, ` INSERT INTO webhook_logs ( template_id, message_id, attempt_count, status_code, response_body, error, duration_ms, payload_preview, payload_truncated ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) `, templateID, messageID, attempt, statusCode, responseBody, errMsg, durationMS, payloadPreview, payloadTruncated); err != nil { e.logger.Error("failed to log webhook", "error", err) } } func (e *Executor) recordDeadLetter(ctx context.Context, templateID, messageID string, attempts, statusCode int, execErr error, payloadPreview string) { errMsg := "" if execErr != nil { errMsg = execErr.Error() } else if statusCode >= http.StatusBadRequest { errMsg = fmt.Sprintf("webhook returned %d", statusCode) } if _, err := e.db.Exec(ctx, ` INSERT INTO webhook_dead_letters (template_id, message_id, attempt_count, last_status_code, error, payload_preview) VALUES ($1, $2, $3, $4, $5, $6) `, templateID, messageID, attempts, statusCode, errMsg, payloadPreview); err != nil { e.logger.Error("failed to write webhook dead-letter", "error", err) return } observability.IncWebhookDeadLetter() } func shouldRetry(execErr error, statusCode int) bool { if execErr != nil { return true } return statusCode >= http.StatusInternalServerError || statusCode == http.StatusTooManyRequests } func retryReason(execErr error, statusCode int) string { if execErr != nil { return "network" } return strconv.Itoa(statusCode/100) + "xx" } func webhookRetryDelay(attempt int) time.Duration { delay := 500 * time.Millisecond for i := 1; i < attempt; i++ { if delay >= 8*time.Second { return 8 * time.Second } delay *= 2 } if delay > 8*time.Second { return 8 * time.Second } return delay } func waitWithContext(ctx context.Context, d time.Duration) error { timer := time.NewTimer(d) defer timer.Stop() select { case <-timer.C: return nil case <-ctx.Done(): return ctx.Err() } } func signPayload(secret, timestamp, payload string) string { mac := hmac.New(sha256.New, []byte(secret)) mac.Write([]byte(timestamp)) mac.Write([]byte(".")) mac.Write([]byte(payload)) return "sha256=" + hex.EncodeToString(mac.Sum(nil)) } func truncateForLog(payload string, maxLen int) (string, bool) { if len(payload) <= maxLen { return payload, false } return payload[:maxLen], true } func RenderBodyTemplate(template string, ctx *MessageContext) string { return interpolate(template, ctx) } func interpolate(template string, ctx *MessageContext) string { r := strings.NewReplacer( "$sender.name", ctx.SenderName, "$sender.email", ctx.SenderEmail, "$subject", ctx.Subject, "$body.textContent", ctx.BodyText, "$body.htmlContent", ctx.BodyHTML, "$date", ctx.Date, "$recipients.to", ctx.Recipients, "$message_id", ctx.MessageID, "$event.type", ctx.EventType, "$event.domain", ctx.EventDomain, "$drive.file_name", ctx.DriveFileName, "$drive.file_path", ctx.DriveFilePath, "$drive.mime_type", ctx.DriveMimeType, "$drive.file_size", ctx.DriveFileSize, "$drive.is_folder", ctx.DriveIsFolder, "$contact.id", ctx.ContactID, "$contact.name", ctx.ContactName, "$contact.email", ctx.ContactEmail, "$contact.phone", ctx.ContactPhone, "$contact.org", ctx.ContactOrg, ) return r.Replace(template) }