diff --git a/.env.example b/.env.example index 2ac947b..0534af6 100644 --- a/.env.example +++ b/.env.example @@ -175,6 +175,11 @@ GRAFANA_ADMIN_PASSWORD=admin MAIL_ATTACHMENTS_BUCKET=mail-attachments MAIL_SYNC_INTERVAL=2m MAIL_OUTBOX_INTERVAL=10s +MAIL_OUTBOX_MAX_RETRIES=8 +MAIL_SEND_RATE_PER_MINUTE=30 +MAIL_SEND_BURST=10 +MAIL_SMTP_CIRCUIT_FAILURES=5 +MAIL_SMTP_CIRCUIT_COOLDOWN=5m # Credentials IMAP/SMTP chiffrés AES-GCM (format keyring: key_id:base64key,key_id2:base64key2) # Rotation: ajouter nouvelle clé dans MAIL_CREDENTIAL_KEYS puis basculer MAIL_ACTIVE_CREDENTIAL_KEY_ID. # Les anciennes clés restent présentes temporairement pour déchiffrement. diff --git a/cmd/ultid/main.go b/cmd/ultid/main.go index 96d2c65..defa104 100644 --- a/cmd/ultid/main.go +++ b/cmd/ultid/main.go @@ -25,6 +25,7 @@ import ( "github.com/ultisuite/ulti-backend/internal/api/contacts" "github.com/ultisuite/ulti-backend/internal/api/drive" mailapi "github.com/ultisuite/ulti-backend/internal/api/mail" + "github.com/ultisuite/ulti-backend/internal/api/mail/sendguard" meetapi "github.com/ultisuite/ulti-backend/internal/api/meet" "github.com/ultisuite/ulti-backend/internal/api/middleware" photosapi "github.com/ultisuite/ulti-backend/internal/api/photos" @@ -139,7 +140,11 @@ func main() { go imapsync.NewSyncWorker(pool, cfg.MailSyncInterval, credentialManager).Start(ctx) sender := smtp.NewSender(pool, credentialManager) - go smtp.NewOutboxProcessor(pool, sender, cfg.MailOutboxInterval).Start(ctx) + smtpCircuit := smtp.NewCircuitBreaker(cfg.MailSMTPCircuitFailures, cfg.MailSMTPCircuitCooldown) + guardedSender := smtp.NewGuardedSender(sender, smtpCircuit) + go smtp.NewOutboxProcessor(pool, guardedSender, cfg.MailOutboxInterval, cfg.MailOutboxMaxRetries).Start(ctx) + + sendRateLimiter := sendguard.NewRateLimiter(cfg.MailSendRatePerMinute, cfg.MailSendBurst) // Router r := chi.NewRouter() @@ -147,7 +152,7 @@ func main() { r.Use(cors.Handler(cors.Options{ AllowedOrigins: []string{"*"}, AllowedMethods: []string{"GET", "POST", "PUT", "PATCH", "DELETE", "OPTIONS"}, - AllowedHeaders: []string{"Authorization", "Content-Type", apiresponse.TraceIDHeader}, + AllowedHeaders: []string{"Authorization", "Content-Type", "Idempotency-Key", apiresponse.TraceIDHeader}, ExposedHeaders: []string{apiresponse.TraceIDHeader}, AllowCredentials: false, MaxAge: 300, @@ -173,7 +178,7 @@ func main() { r.Group(func(r chi.Router) { r.Use(middleware.Auth(verifier, pool, auditLogger)) - r.Mount("/api/v1/mail", mailapi.NewHandler(pool, auditLogger, credentialManager, attachmentStorage, cfg.MailAttachmentsBucket).Routes()) + r.Mount("/api/v1/mail", mailapi.NewHandler(pool, auditLogger, credentialManager, attachmentStorage, cfg.MailAttachmentsBucket, sendRateLimiter).Routes()) r.Mount("/api/v1/admin", admin.NewHandler(pool, auditLogger).Routes()) r.Get("/api/v1/search", search.NewHandler(pool).Search) diff --git a/go.mod b/go.mod index 21fdad4..e0f5c3a 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/ultisuite/ulti-backend -go 1.23.0 +go 1.25.0 require ( github.com/coder/websocket v1.8.14 @@ -12,12 +12,15 @@ require ( github.com/go-chi/cors v1.2.1 github.com/google/uuid v1.6.0 github.com/jackc/pgx/v5 v5.7.1 + github.com/microcosm-cc/bluemonday v1.0.27 github.com/minio/minio-go/v7 v7.0.80 github.com/prometheus/client_golang v1.23.2 github.com/redis/go-redis/v9 v9.7.0 + golang.org/x/time v0.15.0 ) require ( + github.com/aymerick/douceur v0.2.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect @@ -26,6 +29,7 @@ require ( github.com/go-ini/ini v1.67.0 // indirect github.com/go-jose/go-jose/v4 v4.0.2 // indirect github.com/goccy/go-json v0.10.3 // indirect + github.com/gorilla/css v1.0.1 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect github.com/jackc/puddle/v2 v2.2.2 // indirect diff --git a/go.sum b/go.sum index d76eff1..60227e9 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +github.com/aymerick/douceur v0.2.0 h1:Mv+mAeH1Q+n9Fr+oyamOlAkUNPWPlA8PPGR0QAaYuPk= +github.com/aymerick/douceur v0.2.0/go.mod h1:wlT5vV2O3h55X9m7iVYN0TBM0NH/MmbLnd30/FjWUq4= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= @@ -40,6 +42,8 @@ github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/css v1.0.1 h1:ntNaBIghp6JmvWnxbZKANoLyuXTPZ4cAMlo6RyhlbO8= +github.com/gorilla/css v1.0.1/go.mod h1:BvnYkspnSzMmwRK+b8/xgNPLiIuNZr6vbZBTPQ2A3b0= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= @@ -59,6 +63,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= +github.com/microcosm-cc/bluemonday v1.0.27 h1:MpEUotklkwCSLeH+Qdx1VJgNqLlpY2KXwXFM08ygZfk= +github.com/microcosm-cc/bluemonday v1.0.27/go.mod h1:jFi9vgW+H7c3V0lb6nR74Ib/DIB5OBs92Dimizgw2cA= github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34= github.com/minio/md5-simd v1.1.2/go.mod h1:MzdKDxYpY2BT9XQFocsiZf/NKVtR7nkE4RoEpN+20RM= github.com/minio/minio-go/v7 v7.0.80 h1:2mdUHXEykRdY/BigLt3Iuu1otL0JTogT0Nmltg0wujk= @@ -128,6 +134,8 @@ golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/text v0.28.0 h1:rhazDwis8INMIwQ4tpjLDzUhx6RlXqZNPEM0huQojng= golang.org/x/text v0.28.0/go.mod h1:U8nCwOR8jO/marOQ0QbDiOngZVEBB7MAiitBuMjXiNU= +golang.org/x/time v0.15.0 h1:bbrp8t3bGUeFOx08pvsMYRTCVSMk89u4tKbNOZbp88U= +golang.org/x/time v0.15.0/go.mod h1:Y4YMaQmXwGQZoFaVFk4YpCt4FLQMYKZe9oeV/f4MSno= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= diff --git a/internal/api/apiresponse/codes.go b/internal/api/apiresponse/codes.go index bba726a..ebe5fb3 100644 --- a/internal/api/apiresponse/codes.go +++ b/internal/api/apiresponse/codes.go @@ -14,4 +14,5 @@ const ( CodeNotFound = "not_found" CodeInternal = "internal_error" CodePayloadTooLarge = "request_body_too_large" + CodeRateLimited = "rate_limited" ) diff --git a/internal/api/mail/attachments.go b/internal/api/mail/attachments.go index afbf5e7..5f84108 100644 --- a/internal/api/mail/attachments.go +++ b/internal/api/mail/attachments.go @@ -9,16 +9,17 @@ import ( "github.com/google/uuid" "github.com/jackc/pgx/v5" + "github.com/ultisuite/ulti-backend/internal/mail/limits" "github.com/ultisuite/ulti-backend/internal/mail/storage" ) var ( - ErrAttachmentNotFound = errors.New("attachment not found") - ErrAttachmentTooLarge = errors.New("attachment too large") + ErrAttachmentNotFound = errors.New("attachment not found") + ErrAttachmentTooLarge = limits.ErrAttachmentTooLarge + ErrTooManyAttachments = limits.ErrTooManyAttachments + ErrAttachmentsTotalTooLarge = limits.ErrAttachmentsTotalTooLarge ) -const maxAttachmentSize = 25 << 20 // 25 MiB - type draftAttachmentRef struct { ID string `json:"id"` Filename string `json:"filename"` @@ -97,14 +98,27 @@ func (s *Service) UploadMessageAttachment( if s.storage == nil { return "", errors.New("object storage unavailable") } - if size > maxAttachmentSize { - return "", ErrAttachmentTooLarge + if err := limits.ValidateAttachmentSize(size); err != nil { + return "", err } userID, err := s.ensureMessageOwned(ctx, externalID, messageID) if err != nil { return "", err } + var count int + var totalSize int64 + err = s.db.QueryRow(ctx, ` + SELECT COUNT(*)::int, COALESCE(SUM(size), 0)::bigint + FROM attachments WHERE message_id = $1 + `, messageID).Scan(&count, &totalSize) + if err != nil { + return "", err + } + if err := limits.ValidateAttachmentQuota(count, totalSize, size); err != nil { + return "", err + } + objectKey := storage.MessageObjectKey(userID, messageID, filename) if err := s.storage.Put(ctx, objectKey, reader, size, contentType); err != nil { return "", err @@ -166,8 +180,8 @@ func (s *Service) UploadDraftAttachment( if s.storage == nil { return "", errors.New("object storage unavailable") } - if size > maxAttachmentSize { - return "", ErrAttachmentTooLarge + if err := limits.ValidateAttachmentSize(size); err != nil { + return "", err } userID, err := s.ResolveUserID(ctx, externalID) @@ -197,6 +211,14 @@ func (s *Service) UploadDraftAttachment( _ = json.Unmarshal(attachmentsJSON, &refs) } + var totalSize int64 + for _, ref := range refs { + totalSize += ref.Size + } + if err := limits.ValidateAttachmentQuota(len(refs), totalSize, size); err != nil { + return "", err + } + attID := uuid.NewString() refs = append(refs, draftAttachmentRef{ ID: attID, Filename: filename, ContentType: contentType, Size: size, diff --git a/internal/api/mail/drafts.go b/internal/api/mail/drafts.go index 14771b2..4f8d77e 100644 --- a/internal/api/mail/drafts.go +++ b/internal/api/mail/drafts.go @@ -8,6 +8,7 @@ import ( "github.com/jackc/pgx/v5" "github.com/ultisuite/ulti-backend/internal/api/query" + "github.com/ultisuite/ulti-backend/internal/mail/sanitize" "github.com/ultisuite/ulti-backend/internal/mail/threading" ) @@ -264,7 +265,7 @@ func draftDetailMap( out := map[string]any{ "id": id, "account_id": accountID, "subject": subject, "to": json.RawMessage(toAddrs), "cc": json.RawMessage(ccAddrs), "bcc": json.RawMessage(bccAddrs), - "body_text": bodyText, "body_html": bodyHTML, + "body_text": bodyText, "body_html": sanitize.SanitizeHTML(bodyHTML), "in_reply_to": inReplyTo, "references": references, "attachments": json.RawMessage(attachments), "created_at": createdAt, "updated_at": updatedAt, diff --git a/internal/api/mail/handlers.go b/internal/api/mail/handlers.go index 801fc73..391306f 100644 --- a/internal/api/mail/handlers.go +++ b/internal/api/mail/handlers.go @@ -13,13 +13,16 @@ import ( "github.com/ultisuite/ulti-backend/internal/api/middleware" "github.com/ultisuite/ulti-backend/internal/api/query" "github.com/ultisuite/ulti-backend/internal/mail/credentials" + "github.com/ultisuite/ulti-backend/internal/mail/limits" + "github.com/ultisuite/ulti-backend/internal/api/mail/sendguard" "github.com/ultisuite/ulti-backend/internal/mail/storage" "github.com/ultisuite/ulti-backend/internal/securityaudit" ) type Handler struct { - svc ServiceAPI - logger *slog.Logger + svc ServiceAPI + logger *slog.Logger + sendLimiter *sendguard.RateLimiter } func NewHandlerWithService(svc ServiceAPI) *Handler { @@ -29,8 +32,17 @@ func NewHandlerWithService(svc ServiceAPI) *Handler { } } -func NewHandler(db *pgxpool.Pool, audit *securityaudit.Logger, credentialManager *credentials.Manager, objectStorage *storage.Client, attachmentsBucket string) *Handler { - return NewHandlerWithService(NewService(db, audit, credentialManager, objectStorage, attachmentsBucket)) +func NewHandler( + db *pgxpool.Pool, + audit *securityaudit.Logger, + credentialManager *credentials.Manager, + objectStorage *storage.Client, + attachmentsBucket string, + sendLimiter *sendguard.RateLimiter, +) *Handler { + h := NewHandlerWithService(NewService(db, audit, credentialManager, objectStorage, attachmentsBucket)) + h.sendLimiter = sendLimiter + return h } func (h *Handler) Routes() chi.Router { @@ -282,10 +294,26 @@ func (h *Handler) SendMessage(w http.ResponseWriter, r *http.Request) { return } - var req sendMessageRequest - if err := apivalidate.DecodeJSON(w, r, maxSendRequestBody, &req); err != nil { + if h.sendLimiter != nil { + if err := h.sendLimiter.Allow(userID); err != nil { + apiresponse.WriteError(w, r, http.StatusTooManyRequests, apiresponse.CodeRateLimited, "send rate limit exceeded", nil) + return + } + } + + idempotencyKey, ok := normalizeIdempotencyKey(r.Header.Get("Idempotency-Key")) + if !ok { + apivalidate.WriteValidationError(w, r, apivalidate.NewValidationError(apivalidate.FieldDetail{ + Field: "Idempotency-Key", Message: "invalid", + })) return } + + var req sendMessageRequest + if err := apivalidate.DecodeJSON(w, r, limits.MaxSendRequestBodyBytes, &req); err != nil { + return + } + req.IdempotencyKey = idempotencyKey if verr := validateSendMessage(&req); verr != nil { apivalidate.WriteValidationError(w, r, verr) return diff --git a/internal/api/mail/handlers_attachments.go b/internal/api/mail/handlers_attachments.go index 8232162..69b416f 100644 --- a/internal/api/mail/handlers_attachments.go +++ b/internal/api/mail/handlers_attachments.go @@ -14,10 +14,9 @@ import ( "github.com/ultisuite/ulti-backend/internal/api/apiresponse" "github.com/ultisuite/ulti-backend/internal/api/apivalidate" "github.com/ultisuite/ulti-backend/internal/api/middleware" + "github.com/ultisuite/ulti-backend/internal/mail/limits" ) -const maxMultipartBody = 26 << 20 // 26 MiB - func (h *Handler) ListMessageAttachments(w http.ResponseWriter, r *http.Request) { claims := middleware.ClaimsFromContext(r.Context()) messageID := chi.URLParam(r, "messageID") @@ -56,7 +55,7 @@ func (h *Handler) UploadMessageAttachment(w http.ResponseWriter, r *http.Request claims := middleware.ClaimsFromContext(r.Context()) messageID := chi.URLParam(r, "messageID") - if err := r.ParseMultipartForm(maxMultipartBody); err != nil { + if err := r.ParseMultipartForm(limits.MaxMultipartUploadBytes); err != nil { apiresponse.WriteError(w, r, http.StatusBadRequest, apiresponse.CodeInvalidRequest, "invalid multipart form", nil) return } @@ -88,8 +87,7 @@ func (h *Handler) UploadMessageAttachment(w http.ResponseWriter, r *http.Request apivalidate.WriteNotFound(w, r, "not found") return } - if errors.Is(err, ErrAttachmentTooLarge) { - apiresponse.WriteError(w, r, http.StatusRequestEntityTooLarge, apiresponse.CodeInvalidRequest, "attachment too large", nil) + if writeAttachmentUploadError(w, r, err) { return } h.logger.Error("upload attachment", "error", err) @@ -132,7 +130,7 @@ func (h *Handler) UploadDraftAttachment(w http.ResponseWriter, r *http.Request) claims := middleware.ClaimsFromContext(r.Context()) draftID := chi.URLParam(r, "draftID") - if err := r.ParseMultipartForm(maxMultipartBody); err != nil { + if err := r.ParseMultipartForm(limits.MaxMultipartUploadBytes); err != nil { apiresponse.WriteError(w, r, http.StatusBadRequest, apiresponse.CodeInvalidRequest, "invalid multipart form", nil) return } @@ -164,8 +162,7 @@ func (h *Handler) UploadDraftAttachment(w http.ResponseWriter, r *http.Request) apivalidate.WriteNotFound(w, r, "not found") return } - if errors.Is(err, ErrAttachmentTooLarge) { - apiresponse.WriteError(w, r, http.StatusRequestEntityTooLarge, apiresponse.CodeInvalidRequest, "attachment too large", nil) + if writeAttachmentUploadError(w, r, err) { return } h.logger.Error("upload draft attachment", "error", err) @@ -201,3 +198,16 @@ func (h *Handler) DownloadDraftAttachment(w http.ResponseWriter, r *http.Request w.Header().Set("Content-Disposition", fmt.Sprintf(`%s; filename="%s"`, disposition, filename)) _, _ = io.Copy(w, body) } + +func writeAttachmentUploadError(w http.ResponseWriter, r *http.Request, err error) bool { + switch { + case errors.Is(err, limits.ErrAttachmentTooLarge), errors.Is(err, limits.ErrAttachmentsTotalTooLarge): + apiresponse.WriteError(w, r, http.StatusRequestEntityTooLarge, apiresponse.CodeInvalidRequest, "attachment too large", nil) + return true + case errors.Is(err, limits.ErrTooManyAttachments): + apiresponse.WriteError(w, r, http.StatusBadRequest, apiresponse.CodeInvalidRequest, "too many attachments", nil) + return true + default: + return false + } +} diff --git a/internal/api/mail/handlers_drafts.go b/internal/api/mail/handlers_drafts.go index 96857ff..f44d80e 100644 --- a/internal/api/mail/handlers_drafts.go +++ b/internal/api/mail/handlers_drafts.go @@ -10,6 +10,7 @@ import ( "github.com/ultisuite/ulti-backend/internal/api/apivalidate" "github.com/ultisuite/ulti-backend/internal/api/middleware" "github.com/ultisuite/ulti-backend/internal/api/query" + "github.com/ultisuite/ulti-backend/internal/mail/limits" ) func (h *Handler) ListDrafts(w http.ResponseWriter, r *http.Request) { @@ -53,7 +54,7 @@ func (h *Handler) CreateDraft(w http.ResponseWriter, r *http.Request) { } var req draftRequest - if err := apivalidate.DecodeJSON(w, r, maxSendRequestBody, &req); err != nil { + if err := apivalidate.DecodeJSON(w, r, limits.MaxSendRequestBodyBytes, &req); err != nil { return } if verr := validateCreateDraft(&req); verr != nil { @@ -82,7 +83,7 @@ func (h *Handler) UpdateDraft(w http.ResponseWriter, r *http.Request) { claims := middleware.ClaimsFromContext(r.Context()) var req draftRequest - if err := apivalidate.DecodeJSON(w, r, maxSendRequestBody, &req); err != nil { + if err := apivalidate.DecodeJSON(w, r, limits.MaxSendRequestBodyBytes, &req); err != nil { return } if verr := validateUpdateDraft(&req); verr != nil { diff --git a/internal/api/mail/idempotency.go b/internal/api/mail/idempotency.go new file mode 100644 index 0000000..c262d01 --- /dev/null +++ b/internal/api/mail/idempotency.go @@ -0,0 +1,28 @@ +package mail + +import ( + "strings" + "unicode" +) + +const ( + maxIdempotencyKeyLen = 128 + minIdempotencyKeyLen = 8 +) + +func normalizeIdempotencyKey(raw string) (string, bool) { + key := strings.TrimSpace(raw) + if key == "" { + return "", true + } + if len(key) < minIdempotencyKeyLen || len(key) > maxIdempotencyKeyLen { + return "", false + } + for _, r := range key { + if unicode.IsLetter(r) || unicode.IsDigit(r) || r == '-' || r == '_' { + continue + } + return "", false + } + return key, true +} diff --git a/internal/api/mail/idempotency_test.go b/internal/api/mail/idempotency_test.go new file mode 100644 index 0000000..c7c85ad --- /dev/null +++ b/internal/api/mail/idempotency_test.go @@ -0,0 +1,30 @@ +package mail + +import "testing" + +func TestNormalizeIdempotencyKey(t *testing.T) { + t.Run("empty allowed", func(t *testing.T) { + key, ok := normalizeIdempotencyKey("") + if !ok || key != "" { + t.Fatalf("got %q ok=%v", key, ok) + } + }) + t.Run("valid", func(t *testing.T) { + key, ok := normalizeIdempotencyKey(" send-abc-123_456 ") + if !ok || key != "send-abc-123_456" { + t.Fatalf("got %q ok=%v", key, ok) + } + }) + t.Run("too short", func(t *testing.T) { + _, ok := normalizeIdempotencyKey("short") + if ok { + t.Fatal("expected invalid") + } + }) + t.Run("invalid chars", func(t *testing.T) { + _, ok := normalizeIdempotencyKey("bad key with spaces!!!!") + if ok { + t.Fatal("expected invalid") + } + }) +} diff --git a/internal/api/mail/sendguard/ratelimit.go b/internal/api/mail/sendguard/ratelimit.go new file mode 100644 index 0000000..2a730de --- /dev/null +++ b/internal/api/mail/sendguard/ratelimit.go @@ -0,0 +1,55 @@ +package sendguard + +import ( + "errors" + "sync" + "time" + + "golang.org/x/time/rate" +) + +var ErrSendRateLimited = errors.New("send rate limited") + +// RateLimiter limits outbound send API requests per user. +type RateLimiter struct { + mu sync.Mutex + limits map[string]*rate.Limiter + limit rate.Limit + burst int +} + +func NewRateLimiter(perMinute int, burst int) *RateLimiter { + if perMinute < 1 { + perMinute = 30 + } + if burst < 1 { + burst = 10 + } + return &RateLimiter{ + limits: make(map[string]*rate.Limiter), + limit: rate.Every(time.Minute / time.Duration(perMinute)), + burst: burst, + } +} + +func (r *RateLimiter) Allow(userID string) error { + if userID == "" { + return ErrSendRateLimited + } + lim := r.limiter(userID) + if !lim.Allow() { + return ErrSendRateLimited + } + return nil +} + +func (r *RateLimiter) limiter(userID string) *rate.Limiter { + r.mu.Lock() + defer r.mu.Unlock() + lim, ok := r.limits[userID] + if !ok { + lim = rate.NewLimiter(r.limit, r.burst) + r.limits[userID] = lim + } + return lim +} diff --git a/internal/api/mail/sendguard/ratelimit_test.go b/internal/api/mail/sendguard/ratelimit_test.go new file mode 100644 index 0000000..aac8c2f --- /dev/null +++ b/internal/api/mail/sendguard/ratelimit_test.go @@ -0,0 +1,28 @@ +package sendguard + +import ( + "testing" +) + +func TestRateLimiter_blocksBurst(t *testing.T) { + lim := NewRateLimiter(60, 2) + if err := lim.Allow("user-1"); err != nil { + t.Fatalf("first: %v", err) + } + if err := lim.Allow("user-1"); err != nil { + t.Fatalf("second: %v", err) + } + if err := lim.Allow("user-1"); err == nil { + t.Fatal("expected rate limit on third immediate request") + } +} + +func TestRateLimiter_perUser(t *testing.T) { + lim := NewRateLimiter(60, 1) + if err := lim.Allow("a"); err != nil { + t.Fatalf("user a: %v", err) + } + if err := lim.Allow("b"); err != nil { + t.Fatalf("user b should have separate bucket: %v", err) + } +} diff --git a/internal/api/mail/service.go b/internal/api/mail/service.go index e5ad40f..686bba3 100644 --- a/internal/api/mail/service.go +++ b/internal/api/mail/service.go @@ -12,6 +12,7 @@ import ( "github.com/ultisuite/ulti-backend/internal/api/query" "github.com/ultisuite/ulti-backend/internal/mail/credentials" + "github.com/ultisuite/ulti-backend/internal/mail/sanitize" "github.com/ultisuite/ulti-backend/internal/mail/storage" "github.com/ultisuite/ulti-backend/internal/mail/threading" "github.com/ultisuite/ulti-backend/internal/securityaudit" @@ -279,7 +280,7 @@ func (s *Service) GetMessage(ctx context.Context, externalID, messageID string) out := map[string]any{ "id": msg.ID, "message_id": msg.MessageID, "subject": msg.Subject, "from": json.RawMessage(msg.From), "to": json.RawMessage(msg.To), "cc": json.RawMessage(msg.Cc), - "date": msg.Date, "body_text": msg.Text, "body_html": msg.HTML, + "date": msg.Date, "body_text": msg.Text, "body_html": sanitize.SanitizeHTML(msg.HTML), "flags": msg.Flags, "labels": msg.Labels, "in_reply_to": msg.InReplyTo, "references": msg.References, } @@ -402,6 +403,22 @@ func (s *Service) loadReplyParent(ctx context.Context, userID, replyToMessageID } func (s *Service) SendMessage(ctx context.Context, userID string, req *sendMessageRequest) (id, status string, err error) { + if req.IdempotencyKey != "" { + err = s.db.QueryRow(ctx, ` + SELECT id, status FROM outbox + WHERE user_id = $1 AND idempotency_key = $2 + AND created_at > NOW() - INTERVAL '24 hours' + ORDER BY created_at DESC + LIMIT 1 + `, userID, req.IdempotencyKey).Scan(&id, &status) + if err == nil { + return id, status, nil + } + if !errors.Is(err, pgx.ErrNoRows) { + return "", "", err + } + } + toJSON, _ := json.Marshal(req.To) ccJSON, _ := json.Marshal(req.Cc) bccJSON, _ := json.Marshal(req.Bcc) @@ -424,16 +441,31 @@ func (s *Service) SendMessage(ctx context.Context, userID string, req *sendMessa } err = s.db.QueryRow(ctx, ` - INSERT INTO outbox (user_id, account_id, to_addrs, cc_addrs, bcc_addrs, subject, body_text, body_html, in_reply_to, references_header, status, scheduled_at) - SELECT $1, ma.id, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12 + INSERT INTO outbox ( + user_id, account_id, to_addrs, cc_addrs, bcc_addrs, subject, + body_text, body_html, in_reply_to, references_header, status, scheduled_at, idempotency_key + ) + SELECT $1, ma.id, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13 FROM mail_accounts ma WHERE ma.id = $2 AND ma.user_id = $1 RETURNING id - `, userID, req.AccountID, toJSON, ccJSON, bccJSON, req.Subject, req.BodyText, req.BodyHTML, inReplyTo, references, status, req.ScheduleAt).Scan(&id) + `, userID, req.AccountID, toJSON, ccJSON, bccJSON, req.Subject, req.BodyText, req.BodyHTML, + inReplyTo, references, status, req.ScheduleAt, req.IdempotencyKey).Scan(&id) if err != nil { if errors.Is(err, pgx.ErrNoRows) { return "", "", ErrAccountNotFound } + if req.IdempotencyKey != "" && isUniqueViolation(err) { + err = s.db.QueryRow(ctx, ` + SELECT id, status FROM outbox + WHERE user_id = $1 AND idempotency_key = $2 + ORDER BY created_at DESC + LIMIT 1 + `, userID, req.IdempotencyKey).Scan(&id, &status) + if err == nil { + return id, status, nil + } + } return "", "", err } return id, status, nil diff --git a/internal/api/mail/validate.go b/internal/api/mail/validate.go index c15140f..0cdf7f6 100644 --- a/internal/api/mail/validate.go +++ b/internal/api/mail/validate.go @@ -11,11 +11,11 @@ import ( "unicode" "github.com/ultisuite/ulti-backend/internal/api/apivalidate" + "github.com/ultisuite/ulti-backend/internal/mail/limits" ) const ( maxAccountRequestBody = 32 << 10 // 32 KiB - maxSendRequestBody = 5 << 20 // 5 MiB maxWebhookRequestBody = 128 << 10 // 128 KiB maxRulesRequestBody = 256 << 10 // 256 KiB maxFlagsLabelsBody = 32 << 10 // 32 KiB @@ -25,9 +25,8 @@ const ( maxHeaderNameLen = 256 maxHeaderValueLen = 8192 - maxSubjectLen = 998 - maxBodyField = 4 << 20 // 4 MiB per body field - maxEmailLen = 320 + maxSubjectLen = 998 + maxEmailLen = 320 maxHostLen = 253 maxAccountName = 128 maxUsernameLen = 256 @@ -191,6 +190,7 @@ type sendMessageRequest struct { InReplyTo string `json:"in_reply_to"` ReplyToMessageID string `json:"reply_to_message_id"` ScheduleAt *string `json:"schedule_at"` + IdempotencyKey string `json:"-"` } func validateSendMessage(req *sendMessageRequest) *apivalidate.ValidationError { @@ -223,10 +223,10 @@ func validateSendMessage(req *sendMessageRequest) *apivalidate.ValidationError { if len(req.Subject) > maxSubjectLen { details = append(details, apivalidate.FieldDetail{Field: "subject", Message: "too long"}) } - if len(req.BodyText) > maxBodyField { + if len(req.BodyText) > limits.MaxBodyFieldBytes { details = append(details, apivalidate.FieldDetail{Field: "body_text", Message: "too long"}) } - if len(req.BodyHTML) > maxBodyField { + if len(req.BodyHTML) > limits.MaxBodyFieldBytes { details = append(details, apivalidate.FieldDetail{Field: "body_html", Message: "too long"}) } if req.InReplyTo != "" && len(req.InReplyTo) > 998 { diff --git a/internal/api/mail/validate_drafts.go b/internal/api/mail/validate_drafts.go index e80f4ae..d4a11a9 100644 --- a/internal/api/mail/validate_drafts.go +++ b/internal/api/mail/validate_drafts.go @@ -6,6 +6,7 @@ import ( "strings" "github.com/ultisuite/ulti-backend/internal/api/apivalidate" + "github.com/ultisuite/ulti-backend/internal/mail/limits" ) type draftRequest struct { @@ -49,10 +50,10 @@ func validateDraftContent(req *draftRequest) []apivalidate.FieldDetail { if len(req.Subject) > maxSubjectLen { details = append(details, apivalidate.FieldDetail{Field: "subject", Message: "too long"}) } - if len(req.BodyText) > maxBodyField { + if len(req.BodyText) > limits.MaxBodyFieldBytes { details = append(details, apivalidate.FieldDetail{Field: "body_text", Message: "too long"}) } - if len(req.BodyHTML) > maxBodyField { + if len(req.BodyHTML) > limits.MaxBodyFieldBytes { details = append(details, apivalidate.FieldDetail{Field: "body_html", Message: "too long"}) } if req.InReplyTo != "" && len(req.InReplyTo) > 998 { @@ -61,7 +62,7 @@ func validateDraftContent(req *draftRequest) []apivalidate.FieldDetail { if req.Attachments != nil { if b, err := json.Marshal(req.Attachments); err != nil { details = append(details, apivalidate.FieldDetail{Field: "attachments", Message: "invalid"}) - } else if len(b) > maxSendRequestBody { + } else if len(b) > limits.MaxSendRequestBodyBytes { details = append(details, apivalidate.FieldDetail{Field: "attachments", Message: "too large"}) } } diff --git a/internal/config/config.go b/internal/config/config.go index e82aee7..0242b1b 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -56,6 +56,11 @@ type Config struct { MailAttachmentsBucket string MailSyncInterval time.Duration MailOutboxInterval time.Duration + MailOutboxMaxRetries int + MailSendRatePerMinute int + MailSendBurst int + MailSMTPCircuitFailures int + MailSMTPCircuitCooldown time.Duration MailCredentialKeys string MailActiveCredentialKeyID string MailWebhookSharedSecret string @@ -123,6 +128,11 @@ func Load() (*Config, error) { MailAttachmentsBucket: envOrDefault("MAIL_ATTACHMENTS_BUCKET", "mail-attachments"), MailSyncInterval: envDuration("MAIL_SYNC_INTERVAL", 2*time.Minute), MailOutboxInterval: envDuration("MAIL_OUTBOX_INTERVAL", 10*time.Second), + MailOutboxMaxRetries: envInt("MAIL_OUTBOX_MAX_RETRIES", 8), + MailSendRatePerMinute: envInt("MAIL_SEND_RATE_PER_MINUTE", 30), + MailSendBurst: envInt("MAIL_SEND_BURST", 10), + MailSMTPCircuitFailures: envInt("MAIL_SMTP_CIRCUIT_FAILURES", 5), + MailSMTPCircuitCooldown: envDuration("MAIL_SMTP_CIRCUIT_COOLDOWN", 5*time.Minute), MailCredentialKeys: secrets.Env("MAIL_CREDENTIAL_KEYS"), MailActiveCredentialKeyID: envOrDefault("MAIL_ACTIVE_CREDENTIAL_KEY_ID", ""), MailWebhookSharedSecret: secrets.Env("MAIL_WEBHOOK_SHARED_SECRET"), diff --git a/internal/mail/limits/limits.go b/internal/mail/limits/limits.go new file mode 100644 index 0000000..0a75267 --- /dev/null +++ b/internal/mail/limits/limits.go @@ -0,0 +1,38 @@ +package limits + +import "errors" + +// Default mail body and attachment size limits (bytes unless noted). +const ( + MaxBodyFieldBytes = 4 << 20 // 4 MiB per body_text / body_html field + MaxSendRequestBodyBytes = 5 << 20 // 5 MiB JSON send/draft request + MaxAttachmentBytes = 25 << 20 // 25 MiB per attachment file + MaxMultipartUploadBytes = 26 << 20 // 26 MiB multipart form (file + fields) + MaxAttachmentsPerMessage = 50 + MaxTotalAttachmentsPerMessageBytes = 100 << 20 // 100 MiB combined per message/draft +) + +var ( + ErrAttachmentTooLarge = errors.New("attachment too large") + ErrTooManyAttachments = errors.New("too many attachments") + ErrAttachmentsTotalTooLarge = errors.New("attachments total size exceeded") +) + +// ValidateAttachmentSize rejects a single attachment larger than MaxAttachmentBytes. +func ValidateAttachmentSize(size int64) error { + if size > MaxAttachmentBytes { + return ErrAttachmentTooLarge + } + return nil +} + +// ValidateAttachmentQuota rejects when adding newSize would exceed per-message count or total size limits. +func ValidateAttachmentQuota(existingCount int, existingTotalBytes int64, newSize int64) error { + if existingCount >= MaxAttachmentsPerMessage { + return ErrTooManyAttachments + } + if existingTotalBytes+newSize > MaxTotalAttachmentsPerMessageBytes { + return ErrAttachmentsTotalTooLarge + } + return nil +} diff --git a/internal/mail/limits/limits_test.go b/internal/mail/limits/limits_test.go new file mode 100644 index 0000000..aa33e4d --- /dev/null +++ b/internal/mail/limits/limits_test.go @@ -0,0 +1,41 @@ +package limits + +import ( + "errors" + "testing" +) + +func TestValidateAttachmentSize(t *testing.T) { + t.Parallel() + if err := ValidateAttachmentSize(MaxAttachmentBytes); err != nil { + t.Fatalf("at limit: %v", err) + } + if err := ValidateAttachmentSize(MaxAttachmentBytes + 1); !errors.Is(err, ErrAttachmentTooLarge) { + t.Fatalf("over limit: %v", err) + } +} + +func TestValidateAttachmentQuota(t *testing.T) { + t.Parallel() + cases := []struct { + name string + count int + total int64 + newSize int64 + want error + }{ + {"ok", 0, 0, 1, nil}, + {"at count", MaxAttachmentsPerMessage, 0, 1, ErrTooManyAttachments}, + {"at total", 0, MaxTotalAttachmentsPerMessageBytes, 1, ErrAttachmentsTotalTooLarge}, + {"would exceed total", 1, MaxTotalAttachmentsPerMessageBytes - 10, 11, ErrAttachmentsTotalTooLarge}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + err := ValidateAttachmentQuota(tc.count, tc.total, tc.newSize) + if !errors.Is(err, tc.want) { + t.Fatalf("got %v want %v", err, tc.want) + } + }) + } +} diff --git a/internal/mail/sanitize/sanitize.go b/internal/mail/sanitize/sanitize.go new file mode 100644 index 0000000..bd48658 --- /dev/null +++ b/internal/mail/sanitize/sanitize.go @@ -0,0 +1,12 @@ +package sanitize + +import "github.com/microcosm-cc/bluemonday" + +var policy = bluemonday.UGCPolicy() + +func SanitizeHTML(html string) string { + if html == "" { + return "" + } + return policy.Sanitize(html) +} diff --git a/internal/mail/sanitize/sanitize_test.go b/internal/mail/sanitize/sanitize_test.go new file mode 100644 index 0000000..029f97a --- /dev/null +++ b/internal/mail/sanitize/sanitize_test.go @@ -0,0 +1,42 @@ +package sanitize + +import ( + "strings" + "testing" +) + +func TestSanitizeHTML_stripsScriptTags(t *testing.T) { + in := `

Hello

World` + got := SanitizeHTML(in) + if strings.Contains(got, "script") { + t.Fatalf("expected script removed, got %q", got) + } + if !strings.Contains(got, "Hello") || !strings.Contains(got, "World") { + t.Fatalf("expected safe content preserved, got %q", got) + } +} + +func TestSanitizeHTML_stripsJavascriptURLs(t *testing.T) { + in := `clickx` + got := SanitizeHTML(in) + if strings.Contains(strings.ToLower(got), "javascript:") { + t.Fatalf("expected javascript: URLs removed, got %q", got) + } +} + +func TestSanitizeHTML_preservesSafeContent(t *testing.T) { + in := `

Hi

linkpic` + got := SanitizeHTML(in) + if !strings.Contains(got, `href="https://example.com"`) { + t.Fatalf("expected safe link preserved, got %q", got) + } + if !strings.Contains(got, `src="https://example.com/a.png"`) { + t.Fatalf("expected safe image preserved, got %q", got) + } +} + +func TestSanitizeHTML_empty(t *testing.T) { + if got := SanitizeHTML(""); got != "" { + t.Fatalf("expected empty string, got %q", got) + } +} diff --git a/internal/mail/smtp/circuit.go b/internal/mail/smtp/circuit.go new file mode 100644 index 0000000..ae02ac3 --- /dev/null +++ b/internal/mail/smtp/circuit.go @@ -0,0 +1,88 @@ +package smtp + +import ( + "errors" + "sync" + "time" +) + +var ErrCircuitOpen = errors.New("smtp circuit open") + +// CircuitBreaker tracks consecutive SMTP failures per mail account. +type CircuitBreaker struct { + threshold int + cooldown time.Duration + mu sync.Mutex + accounts map[string]*circuitState +} + +type circuitState struct { + failures int + openUntil time.Time + halfOpenTry bool +} + +func NewCircuitBreaker(threshold int, cooldown time.Duration) *CircuitBreaker { + if threshold < 1 { + threshold = 5 + } + if cooldown <= 0 { + cooldown = 5 * time.Minute + } + return &CircuitBreaker{ + threshold: threshold, + cooldown: cooldown, + accounts: make(map[string]*circuitState), + } +} + +func (cb *CircuitBreaker) Allow(accountID string) error { + cb.mu.Lock() + defer cb.mu.Unlock() + + st := cb.state(accountID) + now := time.Now() + if st.openUntil.IsZero() || now.After(st.openUntil) { + if !st.openUntil.IsZero() { + st.halfOpenTry = true + } + return nil + } + return ErrCircuitOpen +} + +func (cb *CircuitBreaker) RecordSuccess(accountID string) { + cb.mu.Lock() + defer cb.mu.Unlock() + + st := cb.state(accountID) + st.failures = 0 + st.openUntil = time.Time{} + st.halfOpenTry = false +} + +func (cb *CircuitBreaker) RecordFailure(accountID string) { + cb.mu.Lock() + defer cb.mu.Unlock() + + st := cb.state(accountID) + if st.halfOpenTry { + st.halfOpenTry = false + st.failures = cb.threshold + st.openUntil = time.Now().Add(cb.cooldown) + return + } + st.failures++ + if st.failures >= cb.threshold { + st.openUntil = time.Now().Add(cb.cooldown) + } +} + +func (cb *CircuitBreaker) state(accountID string) *circuitState { + st, ok := cb.accounts[accountID] + if !ok { + st = &circuitState{} + cb.accounts[accountID] = st + } + return st +} diff --git a/internal/mail/smtp/circuit_test.go b/internal/mail/smtp/circuit_test.go new file mode 100644 index 0000000..ecd5dd8 --- /dev/null +++ b/internal/mail/smtp/circuit_test.go @@ -0,0 +1,42 @@ +package smtp + +import ( + "errors" + "testing" + "time" +) + +func TestCircuitBreaker_opensAfterThreshold(t *testing.T) { + cb := NewCircuitBreaker(3, time.Minute) + account := "acc-1" + + for i := 0; i < 3; i++ { + if err := cb.Allow(account); err != nil { + t.Fatalf("allow %d: %v", i, err) + } + cb.RecordFailure(account) + } + if err := cb.Allow(account); !errors.Is(err, ErrCircuitOpen) { + t.Fatalf("allow after failures = %v, want %v", err, ErrCircuitOpen) + } +} + +func TestCircuitBreaker_recoversAfterCooldown(t *testing.T) { + cb := NewCircuitBreaker(1, 10*time.Millisecond) + account := "acc-2" + + _ = cb.Allow(account) + cb.RecordFailure(account) + if err := cb.Allow(account); !errors.Is(err, ErrCircuitOpen) { + t.Fatalf("expected open circuit: %v", err) + } + + time.Sleep(15 * time.Millisecond) + if err := cb.Allow(account); err != nil { + t.Fatalf("expected half-open allow: %v", err) + } + cb.RecordSuccess(account) + if err := cb.Allow(account); err != nil { + t.Fatalf("expected closed circuit: %v", err) + } +} diff --git a/internal/mail/smtp/guarded_sender.go b/internal/mail/smtp/guarded_sender.go new file mode 100644 index 0000000..802e1ea --- /dev/null +++ b/internal/mail/smtp/guarded_sender.go @@ -0,0 +1,26 @@ +package smtp + +import "context" + +// GuardedSender wraps Sender with a per-account SMTP circuit breaker. +type GuardedSender struct { + inner *Sender + circuit *CircuitBreaker +} + +func NewGuardedSender(inner *Sender, circuit *CircuitBreaker) *GuardedSender { + return &GuardedSender{inner: inner, circuit: circuit} +} + +func (g *GuardedSender) Send(ctx context.Context, req *SendRequest) error { + if err := g.circuit.Allow(req.AccountID); err != nil { + return err + } + err := g.inner.Send(ctx, req) + if err != nil { + g.circuit.RecordFailure(req.AccountID) + return err + } + g.circuit.RecordSuccess(req.AccountID) + return nil +} diff --git a/internal/mail/smtp/outbox.go b/internal/mail/smtp/outbox.go index e196897..929ecc9 100644 --- a/internal/mail/smtp/outbox.go +++ b/internal/mail/smtp/outbox.go @@ -10,19 +10,28 @@ import ( "github.com/ultisuite/ulti-backend/internal/observability" ) -type OutboxProcessor struct { - db *pgxpool.Pool - sender *Sender - logger *slog.Logger - interval time.Duration +type OutboxSender interface { + Send(ctx context.Context, req *SendRequest) error } -func NewOutboxProcessor(db *pgxpool.Pool, sender *Sender, interval time.Duration) *OutboxProcessor { +type OutboxProcessor struct { + db *pgxpool.Pool + sender OutboxSender + logger *slog.Logger + interval time.Duration + maxRetries int +} + +func NewOutboxProcessor(db *pgxpool.Pool, sender OutboxSender, interval time.Duration, maxRetries int) *OutboxProcessor { + if maxRetries < 1 { + maxRetries = DefaultMaxOutboxRetries + } return &OutboxProcessor{ - db: db, - sender: sender, - logger: slog.Default().With("component", "outbox"), - interval: interval, + db: db, + sender: sender, + logger: slog.Default().With("component", "outbox"), + interval: interval, + maxRetries: maxRetries, } } @@ -51,11 +60,12 @@ func (p *OutboxProcessor) processQueue(ctx context.Context) { WHERE id IN ( SELECT id FROM outbox WHERE status = 'queued' + AND (next_retry_at IS NULL OR next_retry_at <= NOW()) ORDER BY created_at ASC LIMIT 10 FOR UPDATE SKIP LOCKED ) - RETURNING id, account_id, to_addrs, cc_addrs, bcc_addrs, subject, body_text, body_html, in_reply_to, references_header + RETURNING id, account_id, to_addrs, cc_addrs, bcc_addrs, subject, body_text, body_html, in_reply_to, references_header, retry_count `) if err != nil { p.logger.Error("failed to query outbox", "error", err) @@ -75,9 +85,10 @@ func (p *OutboxProcessor) processQueue(ctx context.Context) { bodyHTML string inReplyTo string references []string + retryCount int ) - if err := rows.Scan(&id, &accountID, &toJSON, &ccJSON, &bccJSON, &subject, &bodyText, &bodyHTML, &inReplyTo, &references); err != nil { + if err := rows.Scan(&id, &accountID, &toJSON, &ccJSON, &bccJSON, &subject, &bodyText, &bodyHTML, &inReplyTo, &references, &retryCount); err != nil { p.logger.Error("scan outbox row", "error", err) continue } @@ -116,10 +127,21 @@ func (p *OutboxProcessor) processQueue(ctx context.Context) { if err := p.sender.Send(ctx, req); err != nil { p.logger.Error("send failed", "outbox_id", id, "error", err) observability.IncOutboxProcessed("error") + nextRetry := time.Now().Add(OutboxRetryDelay(retryCount)) + newRetry := retryCount + 1 + status := "queued" + if newRetry >= p.maxRetries { + status = "failed" + } if _, execErr := p.db.Exec(ctx, ` - UPDATE outbox SET status = 'queued', retry_count = retry_count + 1, error = $2, updated_at = NOW() + UPDATE outbox SET + status = $2, + retry_count = $3, + next_retry_at = $4, + error = $5, + updated_at = NOW() WHERE id = $1 - `, id, err.Error()); execErr != nil { + `, id, status, newRetry, nextRetry, err.Error()); execErr != nil { p.logger.Error("failed to mark outbox retry", "outbox_id", id, "error", execErr) } } else { diff --git a/internal/mail/smtp/retry.go b/internal/mail/smtp/retry.go new file mode 100644 index 0000000..72ddb50 --- /dev/null +++ b/internal/mail/smtp/retry.go @@ -0,0 +1,28 @@ +package smtp + +import "time" + +const ( + DefaultMaxOutboxRetries = 8 + maxRetryDelay = time.Hour + baseRetryDelay = 30 * time.Second +) + +// OutboxRetryDelay returns exponential backoff before the next send attempt. +func OutboxRetryDelay(retryCount int) time.Duration { + if retryCount <= 0 { + return baseRetryDelay + } + delay := baseRetryDelay + for i := 0; i < retryCount && delay < maxRetryDelay; i++ { + if delay > maxRetryDelay/2 { + delay = maxRetryDelay + break + } + delay *= 2 + } + if delay > maxRetryDelay { + return maxRetryDelay + } + return delay +} diff --git a/internal/mail/smtp/retry_test.go b/internal/mail/smtp/retry_test.go new file mode 100644 index 0000000..cfaec36 --- /dev/null +++ b/internal/mail/smtp/retry_test.go @@ -0,0 +1,27 @@ +package smtp + +import ( + "testing" + "time" +) + +func TestOutboxRetryDelay_exponentialCap(t *testing.T) { + d0 := OutboxRetryDelay(0) + if d0 != baseRetryDelay { + t.Fatalf("retry 0 = %v, want %v", d0, baseRetryDelay) + } + d3 := OutboxRetryDelay(3) + if d3 != 4*time.Minute { + t.Fatalf("retry 3 = %v, want 4m", d3) + } + dLarge := OutboxRetryDelay(20) + if dLarge != maxRetryDelay { + t.Fatalf("retry 20 = %v, want cap %v", dLarge, maxRetryDelay) + } + if OutboxRetryDelay(1) <= OutboxRetryDelay(0) { + t.Fatal("expected increasing backoff") + } + if OutboxRetryDelay(10) > time.Hour { + t.Fatal("delay must not exceed one hour") + } +} diff --git a/migrations/000008_outbox_hardening.down.sql b/migrations/000008_outbox_hardening.down.sql new file mode 100644 index 0000000..6ff0e4b --- /dev/null +++ b/migrations/000008_outbox_hardening.down.sql @@ -0,0 +1,6 @@ +DROP INDEX IF EXISTS idx_outbox_queued_retry; +DROP INDEX IF EXISTS idx_outbox_user_idempotency; + +ALTER TABLE outbox + DROP COLUMN IF EXISTS next_retry_at, + DROP COLUMN IF EXISTS idempotency_key; diff --git a/migrations/000008_outbox_hardening.up.sql b/migrations/000008_outbox_hardening.up.sql new file mode 100644 index 0000000..505a749 --- /dev/null +++ b/migrations/000008_outbox_hardening.up.sql @@ -0,0 +1,11 @@ +ALTER TABLE outbox + ADD COLUMN IF NOT EXISTS idempotency_key TEXT NOT NULL DEFAULT '', + ADD COLUMN IF NOT EXISTS next_retry_at TIMESTAMPTZ; + +CREATE UNIQUE INDEX IF NOT EXISTS idx_outbox_user_idempotency + ON outbox (user_id, idempotency_key) + WHERE idempotency_key <> ''; + +CREATE INDEX IF NOT EXISTS idx_outbox_queued_retry + ON outbox (status, next_retry_at, created_at) + WHERE status = 'queued'; diff --git a/project-plan/checklist-execution.md b/project-plan/checklist-execution.md index 5ee7320..e7eadd2 100644 --- a/project-plan/checklist-execution.md +++ b/project-plan/checklist-execution.md @@ -86,10 +86,10 @@ Objectif: transformer état actuel (partiellement implémenté) vers produit fon #### Hardening -- [ ] Limiter taille body/attachments. -- [ ] Sanitizer HTML côté lecture. -- [ ] Protection anti-abus envoi (rate limit, retry backoff, circuit breaker SMTP). -- [ ] Idempotency key sur envoi. +- [x] Limiter taille body/attachments. +- [x] Sanitizer HTML côté lecture. +- [x] Protection anti-abus envoi (rate limit, retry backoff, circuit breaker SMTP). +- [x] Idempotency key sur envoi. ### 2.2 Sync IMAP & pipeline mail