- Added a new endpoint for simulating rules based on sample messages, allowing users to test rule conditions and actions. - Enhanced webhook management with versioning, preview capabilities, and improved validation for webhook requests. - Updated service interfaces to support new functionalities, including max retries for webhooks and signing secrets. - Implemented observability metrics for webhook retries and dead-letter tracking, improving error handling and monitoring. - Enhanced unit tests to cover new simulation and webhook features, ensuring robust functionality and validation.
282 lines
7.7 KiB
Go
282 lines
7.7 KiB
Go
package webhooks
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"crypto/hmac"
|
|
"crypto/sha256"
|
|
"encoding/hex"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"log/slog"
|
|
"net/http"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/jackc/pgx/v5/pgxpool"
|
|
"github.com/ultisuite/ulti-backend/internal/observability"
|
|
)
|
|
|
|
type Executor struct {
|
|
db *pgxpool.Pool
|
|
client *http.Client
|
|
logger *slog.Logger
|
|
}
|
|
|
|
func NewExecutor(db *pgxpool.Pool) *Executor {
|
|
return &Executor{
|
|
db: db,
|
|
client: &http.Client{
|
|
Timeout: 10 * time.Second,
|
|
},
|
|
logger: slog.Default().With("component", "webhooks"),
|
|
}
|
|
}
|
|
|
|
type MessageContext struct {
|
|
SenderName string `json:"sender_name"`
|
|
SenderEmail string `json:"sender_email"`
|
|
Subject string `json:"subject"`
|
|
BodyText string `json:"body_text"`
|
|
BodyHTML string `json:"body_html"`
|
|
Date string `json:"date"`
|
|
Recipients string `json:"recipients"`
|
|
HasAttachment bool `json:"has_attachment"`
|
|
MessageID string `json:"message_id"`
|
|
}
|
|
|
|
const (
|
|
payloadPreviewLimit = 2048
|
|
maxResponseBodySize = 4096
|
|
maxWebhookRetries = 10
|
|
)
|
|
|
|
type templateConfig struct {
|
|
url string
|
|
method string
|
|
headersJSON []byte
|
|
bodyTemplate string
|
|
signingSecret string
|
|
maxRetries int
|
|
}
|
|
|
|
func (e *Executor) Execute(ctx context.Context, templateID string, msgCtx *MessageContext) error {
|
|
cfg, err := e.loadTemplateConfig(ctx, templateID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if cfg.maxRetries > maxWebhookRetries {
|
|
cfg.maxRetries = maxWebhookRetries
|
|
}
|
|
|
|
body := interpolate(cfg.bodyTemplate, msgCtx)
|
|
payloadPreview, payloadTruncated := truncateForLog(body, payloadPreviewLimit)
|
|
if payloadTruncated {
|
|
observability.IncWebhookPayloadTruncated()
|
|
}
|
|
|
|
var headers map[string]string
|
|
if err := json.Unmarshal(cfg.headersJSON, &headers); err != nil {
|
|
return fmt.Errorf("parse template headers: %w", err)
|
|
}
|
|
totalStart := time.Now()
|
|
|
|
attempts := cfg.maxRetries + 1
|
|
var (
|
|
lastStatusCode int
|
|
lastError error
|
|
lastRespBody string
|
|
)
|
|
|
|
for attempt := 1; attempt <= attempts; attempt++ {
|
|
reqStart := time.Now()
|
|
statusCode, responseBody, reqErr := e.executeAttempt(ctx, cfg, headers, body)
|
|
durationMS := time.Since(reqStart).Milliseconds()
|
|
e.logAttempt(ctx, templateID, msgCtx.MessageID, attempt, statusCode, responseBody, reqErr, durationMS, payloadPreview, payloadTruncated)
|
|
|
|
lastStatusCode = statusCode
|
|
lastRespBody = responseBody
|
|
lastError = reqErr
|
|
|
|
if reqErr == nil && statusCode < http.StatusBadRequest {
|
|
observability.ObserveWebhookExecution("success", statusCode, time.Since(totalStart))
|
|
return nil
|
|
}
|
|
if attempt >= attempts || !shouldRetry(reqErr, statusCode) {
|
|
break
|
|
}
|
|
|
|
observability.IncWebhookRetry(retryReason(reqErr, statusCode))
|
|
if err := waitWithContext(ctx, webhookRetryDelay(attempt)); err != nil {
|
|
lastError = err
|
|
break
|
|
}
|
|
}
|
|
|
|
observability.ObserveWebhookExecution("error", lastStatusCode, time.Since(totalStart))
|
|
e.recordDeadLetter(ctx, templateID, msgCtx.MessageID, attempts, lastStatusCode, lastError, payloadPreview)
|
|
if lastError != nil {
|
|
return fmt.Errorf("webhook failed after retries: %w", lastError)
|
|
}
|
|
return fmt.Errorf("webhook returned %d after retries, response=%s", lastStatusCode, lastRespBody)
|
|
}
|
|
|
|
func (e *Executor) loadTemplateConfig(ctx context.Context, templateID string) (*templateConfig, error) {
|
|
cfg := &templateConfig{}
|
|
err := e.db.QueryRow(ctx, `
|
|
SELECT url, method, headers, body_template, signing_secret, max_retries
|
|
FROM webhook_templates
|
|
WHERE id = $1 AND is_active = true
|
|
`, templateID).Scan(&cfg.url, &cfg.method, &cfg.headersJSON, &cfg.bodyTemplate, &cfg.signingSecret, &cfg.maxRetries)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("query template: %w", err)
|
|
}
|
|
if cfg.maxRetries < 0 {
|
|
cfg.maxRetries = 0
|
|
}
|
|
return cfg, nil
|
|
}
|
|
|
|
func (e *Executor) executeAttempt(ctx context.Context, cfg *templateConfig, headers map[string]string, body string) (statusCode int, responseBody string, err error) {
|
|
req, err := http.NewRequestWithContext(ctx, cfg.method, cfg.url, bytes.NewBufferString(body))
|
|
if err != nil {
|
|
return 0, "", fmt.Errorf("create request: %w", err)
|
|
}
|
|
|
|
for k, v := range headers {
|
|
req.Header.Set(k, v)
|
|
}
|
|
if req.Header.Get("Content-Type") == "" {
|
|
req.Header.Set("Content-Type", "application/json")
|
|
}
|
|
if cfg.signingSecret != "" {
|
|
timestamp := strconv.FormatInt(time.Now().Unix(), 10)
|
|
req.Header.Set("X-Ultimail-Signature-Timestamp", timestamp)
|
|
req.Header.Set("X-Ultimail-Signature", signPayload(cfg.signingSecret, timestamp, body))
|
|
}
|
|
|
|
resp, err := e.client.Do(req)
|
|
if err != nil {
|
|
return 0, "", fmt.Errorf("request failed: %w", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
respBytes, _ := io.ReadAll(io.LimitReader(resp.Body, maxResponseBodySize))
|
|
return resp.StatusCode, string(respBytes), nil
|
|
}
|
|
|
|
func (e *Executor) logAttempt(
|
|
ctx context.Context,
|
|
templateID, messageID string,
|
|
attempt, statusCode int,
|
|
responseBody string,
|
|
execErr error,
|
|
durationMS int64,
|
|
payloadPreview string,
|
|
payloadTruncated bool,
|
|
) {
|
|
errMsg := ""
|
|
if execErr != nil {
|
|
errMsg = execErr.Error()
|
|
}
|
|
if _, err := e.db.Exec(ctx, `
|
|
INSERT INTO webhook_logs (
|
|
template_id, message_id, attempt_count, status_code, response_body, error, duration_ms, payload_preview, payload_truncated
|
|
)
|
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
|
|
`, templateID, messageID, attempt, statusCode, responseBody, errMsg, durationMS, payloadPreview, payloadTruncated); err != nil {
|
|
e.logger.Error("failed to log webhook", "error", err)
|
|
}
|
|
}
|
|
|
|
func (e *Executor) recordDeadLetter(ctx context.Context, templateID, messageID string, attempts, statusCode int, execErr error, payloadPreview string) {
|
|
errMsg := ""
|
|
if execErr != nil {
|
|
errMsg = execErr.Error()
|
|
} else if statusCode >= http.StatusBadRequest {
|
|
errMsg = fmt.Sprintf("webhook returned %d", statusCode)
|
|
}
|
|
if _, err := e.db.Exec(ctx, `
|
|
INSERT INTO webhook_dead_letters (template_id, message_id, attempt_count, last_status_code, error, payload_preview)
|
|
VALUES ($1, $2, $3, $4, $5, $6)
|
|
`, templateID, messageID, attempts, statusCode, errMsg, payloadPreview); err != nil {
|
|
e.logger.Error("failed to write webhook dead-letter", "error", err)
|
|
return
|
|
}
|
|
observability.IncWebhookDeadLetter()
|
|
}
|
|
|
|
func shouldRetry(execErr error, statusCode int) bool {
|
|
if execErr != nil {
|
|
return true
|
|
}
|
|
return statusCode >= http.StatusInternalServerError || statusCode == http.StatusTooManyRequests
|
|
}
|
|
|
|
func retryReason(execErr error, statusCode int) string {
|
|
if execErr != nil {
|
|
return "network"
|
|
}
|
|
return strconv.Itoa(statusCode/100) + "xx"
|
|
}
|
|
|
|
func webhookRetryDelay(attempt int) time.Duration {
|
|
delay := 500 * time.Millisecond
|
|
for i := 1; i < attempt; i++ {
|
|
if delay >= 8*time.Second {
|
|
return 8 * time.Second
|
|
}
|
|
delay *= 2
|
|
}
|
|
if delay > 8*time.Second {
|
|
return 8 * time.Second
|
|
}
|
|
return delay
|
|
}
|
|
|
|
func waitWithContext(ctx context.Context, d time.Duration) error {
|
|
timer := time.NewTimer(d)
|
|
defer timer.Stop()
|
|
select {
|
|
case <-timer.C:
|
|
return nil
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
}
|
|
}
|
|
|
|
func signPayload(secret, timestamp, payload string) string {
|
|
mac := hmac.New(sha256.New, []byte(secret))
|
|
mac.Write([]byte(timestamp))
|
|
mac.Write([]byte("."))
|
|
mac.Write([]byte(payload))
|
|
return "sha256=" + hex.EncodeToString(mac.Sum(nil))
|
|
}
|
|
|
|
func truncateForLog(payload string, maxLen int) (string, bool) {
|
|
if len(payload) <= maxLen {
|
|
return payload, false
|
|
}
|
|
return payload[:maxLen], true
|
|
}
|
|
|
|
func RenderBodyTemplate(template string, ctx *MessageContext) string {
|
|
return interpolate(template, ctx)
|
|
}
|
|
|
|
func interpolate(template string, ctx *MessageContext) string {
|
|
r := strings.NewReplacer(
|
|
"$sender.name", ctx.SenderName,
|
|
"$sender.email", ctx.SenderEmail,
|
|
"$subject", ctx.Subject,
|
|
"$body.textContent", ctx.BodyText,
|
|
"$body.htmlContent", ctx.BodyHTML,
|
|
"$date", ctx.Date,
|
|
"$recipients.to", ctx.Recipients,
|
|
"$message_id", ctx.MessageID,
|
|
)
|
|
return r.Replace(template)
|
|
}
|