From 65fc9e517aa039870a005b238b91e1224c6b5d5f Mon Sep 17 00:00:00 2001 From: R3D347HR4Y Date: Fri, 22 May 2026 17:46:30 +0200 Subject: [PATCH] Implement outbox management features with scheduling and attachment support - Added new API endpoints for sending, rescheduling, and canceling scheduled outbox messages. - Implemented outbox processing logic to handle attachments and manage message statuses. - Introduced a dead-letter strategy for failed outbox messages, enhancing reliability. - Updated database schema to support new outbox statuses and dead-letter entries. - Enhanced unit tests for outbox functionalities, ensuring robust error handling and validation. - Improved attachment handling in the outbox processor to support inline and regular attachments. --- cmd/ultid/main.go | 8 +- internal/api/mail/handlers.go | 6 +- internal/api/mail/handlers_outbox.go | 97 ++++++ internal/api/mail/handlers_test.go | 228 ++++++++++++++ internal/api/mail/outbox_scheduling.go | 73 +++++ internal/api/mail/service_iface.go | 4 + internal/api/mail/validate.go | 25 +- internal/mail/smtp/attachments.go | 92 ++++++ internal/mail/smtp/attachments_test.go | 92 ++++++ internal/mail/smtp/mime.go | 187 +++++++++++ internal/mail/smtp/mime_test.go | 267 ++++++++++++++++ internal/mail/smtp/outbox.go | 290 ++++++++++++++---- internal/mail/smtp/sender.go | 60 +--- .../000010_outbox_status_dead_letter.down.sql | 5 + .../000010_outbox_status_dead_letter.up.sql | 17 + project-plan/checklist-execution.md | 10 +- 16 files changed, 1351 insertions(+), 110 deletions(-) create mode 100644 internal/api/mail/handlers_outbox.go create mode 100644 internal/api/mail/outbox_scheduling.go create mode 100644 internal/mail/smtp/attachments.go create mode 100644 internal/mail/smtp/attachments_test.go create mode 100644 internal/mail/smtp/mime.go create mode 100644 internal/mail/smtp/mime_test.go create mode 100644 migrations/000010_outbox_status_dead_letter.down.sql create mode 100644 migrations/000010_outbox_status_dead_letter.up.sql diff --git a/cmd/ultid/main.go b/cmd/ultid/main.go index 17dc092..bd9b5ab 100644 --- a/cmd/ultid/main.go +++ b/cmd/ultid/main.go @@ -151,7 +151,13 @@ func main() { sender := smtp.NewSender(pool, credentialManager) smtpCircuit := smtp.NewCircuitBreaker(cfg.MailSMTPCircuitFailures, cfg.MailSMTPCircuitCooldown) guardedSender := smtp.NewGuardedSender(sender, smtpCircuit) - go smtp.NewOutboxProcessor(pool, guardedSender, cfg.MailOutboxInterval, cfg.MailOutboxMaxRetries).Start(ctx) + go smtp.NewOutboxProcessor( + pool, + guardedSender, + cfg.MailOutboxInterval, + cfg.MailOutboxMaxRetries, + smtp.WithAttachmentLoader(&smtp.StorageAttachmentLoader{Client: attachmentStorage}), + ).Start(ctx) sendRateLimiter := sendguard.NewRateLimiter(cfg.MailSendRatePerMinute, cfg.MailSendBurst) diff --git a/internal/api/mail/handlers.go b/internal/api/mail/handlers.go index 391306f..450c0b1 100644 --- a/internal/api/mail/handlers.go +++ b/internal/api/mail/handlers.go @@ -10,11 +10,11 @@ import ( "github.com/ultisuite/ulti-backend/internal/api/apiresponse" "github.com/ultisuite/ulti-backend/internal/api/apivalidate" + "github.com/ultisuite/ulti-backend/internal/api/mail/sendguard" "github.com/ultisuite/ulti-backend/internal/api/middleware" "github.com/ultisuite/ulti-backend/internal/api/query" "github.com/ultisuite/ulti-backend/internal/mail/credentials" "github.com/ultisuite/ulti-backend/internal/mail/limits" - "github.com/ultisuite/ulti-backend/internal/api/mail/sendguard" "github.com/ultisuite/ulti-backend/internal/mail/storage" "github.com/ultisuite/ulti-backend/internal/securityaudit" ) @@ -88,6 +88,10 @@ func (h *Handler) Routes() chi.Router { r.Post("/send", h.SendMessage) + r.Post("/outbox/{outboxID}/send-now", h.SendOutboxNow) + r.Post("/outbox/{outboxID}/reschedule", h.RescheduleOutbox) + r.Post("/outbox/{outboxID}/cancel", h.CancelScheduledOutbox) + r.Get("/rules", h.ListRules) r.Post("/rules", h.CreateRule) r.Put("/rules/{ruleID}", h.UpdateRule) diff --git a/internal/api/mail/handlers_outbox.go b/internal/api/mail/handlers_outbox.go new file mode 100644 index 0000000..f6a393c --- /dev/null +++ b/internal/api/mail/handlers_outbox.go @@ -0,0 +1,97 @@ +package mail + +import ( + "errors" + "net/http" + + "github.com/go-chi/chi/v5" + + "github.com/ultisuite/ulti-backend/internal/api/apiresponse" + "github.com/ultisuite/ulti-backend/internal/api/apivalidate" + "github.com/ultisuite/ulti-backend/internal/api/middleware" +) + +func (h *Handler) SendOutboxNow(w http.ResponseWriter, r *http.Request) { + claims := middleware.ClaimsFromContext(r.Context()) + userID, err := h.svc.ResolveUserID(r.Context(), claims.Sub) + if err != nil { + h.writeUserResolveError(w, r, err) + return + } + + outboxID := chi.URLParam(r, "outboxID") + status, err := h.svc.SendOutboxNow(r.Context(), userID, outboxID) + if err != nil { + if h.writeOutboxActionError(w, r, err) { + return + } + h.logger.Error("send outbox now", "error", err, "outbox_id", outboxID) + apivalidate.WriteInternal(w, r) + return + } + apiresponse.WriteJSON(w, http.StatusAccepted, map[string]string{"id": outboxID, "status": status}) +} + +func (h *Handler) RescheduleOutbox(w http.ResponseWriter, r *http.Request) { + claims := middleware.ClaimsFromContext(r.Context()) + userID, err := h.svc.ResolveUserID(r.Context(), claims.Sub) + if err != nil { + h.writeUserResolveError(w, r, err) + return + } + + var req rescheduleOutboxRequest + if err := apivalidate.DecodeJSON(w, r, maxOutboxScheduleBody, &req); err != nil { + return + } + scheduledAt, verr := validateRescheduleOutbox(&req) + if verr != nil { + apivalidate.WriteValidationError(w, r, verr) + return + } + + outboxID := chi.URLParam(r, "outboxID") + status, err := h.svc.RescheduleOutbox(r.Context(), userID, outboxID, *scheduledAt) + if err != nil { + if h.writeOutboxActionError(w, r, err) { + return + } + h.logger.Error("reschedule outbox", "error", err, "outbox_id", outboxID) + apivalidate.WriteInternal(w, r) + return + } + apiresponse.WriteJSON(w, http.StatusOK, map[string]string{"id": outboxID, "status": status}) +} + +func (h *Handler) CancelScheduledOutbox(w http.ResponseWriter, r *http.Request) { + claims := middleware.ClaimsFromContext(r.Context()) + userID, err := h.svc.ResolveUserID(r.Context(), claims.Sub) + if err != nil { + h.writeUserResolveError(w, r, err) + return + } + + outboxID := chi.URLParam(r, "outboxID") + status, err := h.svc.CancelScheduledOutbox(r.Context(), userID, outboxID) + if err != nil { + if h.writeOutboxActionError(w, r, err) { + return + } + h.logger.Error("cancel scheduled outbox", "error", err, "outbox_id", outboxID) + apivalidate.WriteInternal(w, r) + return + } + apiresponse.WriteJSON(w, http.StatusOK, map[string]string{"id": outboxID, "status": status}) +} + +func (h *Handler) writeOutboxActionError(w http.ResponseWriter, r *http.Request, err error) bool { + if errors.Is(err, ErrNotFound) { + apivalidate.WriteNotFound(w, r, "not found") + return true + } + if errors.Is(err, ErrInvalidOutboxStatus) { + apiresponse.WriteError(w, r, http.StatusConflict, apiresponse.CodeInvalidRequest, "invalid outbox status", nil) + return true + } + return false +} diff --git a/internal/api/mail/handlers_test.go b/internal/api/mail/handlers_test.go index e509b77..adbe129 100644 --- a/internal/api/mail/handlers_test.go +++ b/internal/api/mail/handlers_test.go @@ -8,6 +8,7 @@ import ( "net/http" "net/http/httptest" "testing" + "time" "github.com/go-chi/chi/v5" @@ -129,6 +130,78 @@ func (f *fakeMailService) SendMessage(_ context.Context, userID string, req *sen return "outbox-1", "queued", nil } +func (f *fakeMailService) SendOutboxNow(context.Context, string, string) (string, error) { + return "", ErrNotFound +} + +func (f *fakeMailService) RescheduleOutbox(context.Context, string, string, time.Time) (string, error) { + return "", ErrNotFound +} + +func (f *fakeMailService) CancelScheduledOutbox(context.Context, string, string) (string, error) { + return "", ErrNotFound +} + +type outboxFakeService struct { + fakeMailService + items map[string]string +} + +func newOutboxFakeService() *outboxFakeService { + return &outboxFakeService{ + fakeMailService: *newFakeMailService(), + items: map[string]string{ + "outbox-scheduled": "scheduled", + "outbox-queued": "queued", + }, + } +} + +func (f *outboxFakeService) SendOutboxNow(_ context.Context, userID, outboxID string) (string, error) { + if userID != testUserID { + return "", ErrNotFound + } + status, ok := f.items[outboxID] + if !ok { + return "", ErrNotFound + } + if status != "scheduled" { + return "", ErrInvalidOutboxStatus + } + f.items[outboxID] = "queued" + return "queued", nil +} + +func (f *outboxFakeService) RescheduleOutbox(_ context.Context, userID, outboxID string, _ time.Time) (string, error) { + if userID != testUserID { + return "", ErrNotFound + } + status, ok := f.items[outboxID] + if !ok { + return "", ErrNotFound + } + if status != "scheduled" && status != "queued" { + return "", ErrInvalidOutboxStatus + } + f.items[outboxID] = "scheduled" + return "scheduled", nil +} + +func (f *outboxFakeService) CancelScheduledOutbox(_ context.Context, userID, outboxID string) (string, error) { + if userID != testUserID { + return "", ErrNotFound + } + status, ok := f.items[outboxID] + if !ok { + return "", ErrNotFound + } + if status != "scheduled" { + return "", ErrInvalidOutboxStatus + } + f.items[outboxID] = "cancelled" + return "cancelled", nil +} + func (f *fakeMailService) ListDrafts(context.Context, string, query.ListParams) (DraftsList, error) { return DraftsList{}, nil } @@ -568,3 +641,158 @@ func TestDeleteMessage(t *testing.T) { t.Fatalf("get after delete status = %d, want %d", rec.Code, http.StatusNotFound) } } + +func TestSendOutboxNow(t *testing.T) { + svc := newOutboxFakeService() + router := newTestMailRouter(svc) + + t.Run("happy path", func(t *testing.T) { + req := httptest.NewRequest(http.MethodPost, "/outbox/outbox-scheduled/send-now", nil) + rec := httptest.NewRecorder() + router.ServeHTTP(rec, req) + + if rec.Code != http.StatusAccepted { + t.Fatalf("status = %d, want %d; body = %s", rec.Code, http.StatusAccepted, rec.Body.String()) + } + + var resp map[string]string + if err := json.NewDecoder(rec.Body).Decode(&resp); err != nil { + t.Fatalf("decode body: %v", err) + } + if resp["id"] != "outbox-scheduled" || resp["status"] != "queued" { + t.Fatalf("response = %#v", resp) + } + if svc.items["outbox-scheduled"] != "queued" { + t.Fatalf("item status = %q, want queued", svc.items["outbox-scheduled"]) + } + }) + + t.Run("invalid status", func(t *testing.T) { + req := httptest.NewRequest(http.MethodPost, "/outbox/outbox-queued/send-now", nil) + rec := httptest.NewRecorder() + router.ServeHTTP(rec, req) + + if rec.Code != http.StatusConflict { + t.Fatalf("status = %d, want %d; body = %s", rec.Code, http.StatusConflict, rec.Body.String()) + } + }) + + t.Run("not found", func(t *testing.T) { + req := httptest.NewRequest(http.MethodPost, "/outbox/missing/send-now", nil) + rec := httptest.NewRecorder() + router.ServeHTTP(rec, req) + + if rec.Code != http.StatusNotFound { + t.Fatalf("status = %d, want %d; body = %s", rec.Code, http.StatusNotFound, rec.Body.String()) + } + }) +} + +func TestRescheduleOutbox(t *testing.T) { + svc := newOutboxFakeService() + router := newTestMailRouter(svc) + + future := time.Now().Add(2 * time.Hour).UTC().Format(time.RFC3339) + + t.Run("happy path", func(t *testing.T) { + body, err := json.Marshal(map[string]string{"schedule_at": future}) + if err != nil { + t.Fatalf("marshal payload: %v", err) + } + + req := httptest.NewRequest(http.MethodPost, "/outbox/outbox-scheduled/reschedule", bytes.NewReader(body)) + req.Header.Set("Content-Type", "application/json") + rec := httptest.NewRecorder() + router.ServeHTTP(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("status = %d, want %d; body = %s", rec.Code, http.StatusOK, rec.Body.String()) + } + + var resp map[string]string + if err := json.NewDecoder(rec.Body).Decode(&resp); err != nil { + t.Fatalf("decode body: %v", err) + } + if resp["status"] != "scheduled" { + t.Fatalf("response = %#v", resp) + } + }) + + t.Run("invalid schedule_at", func(t *testing.T) { + body, err := json.Marshal(map[string]string{"schedule_at": "not-a-date"}) + if err != nil { + t.Fatalf("marshal payload: %v", err) + } + + req := httptest.NewRequest(http.MethodPost, "/outbox/outbox-scheduled/reschedule", bytes.NewReader(body)) + req.Header.Set("Content-Type", "application/json") + rec := httptest.NewRecorder() + router.ServeHTTP(rec, req) + + if rec.Code != http.StatusBadRequest { + t.Fatalf("status = %d, want %d; body = %s", rec.Code, http.StatusBadRequest, rec.Body.String()) + } + }) + + t.Run("invalid status", func(t *testing.T) { + svc.items["outbox-sent"] = "sent" + body, err := json.Marshal(map[string]string{"schedule_at": future}) + if err != nil { + t.Fatalf("marshal payload: %v", err) + } + + req := httptest.NewRequest(http.MethodPost, "/outbox/outbox-sent/reschedule", bytes.NewReader(body)) + req.Header.Set("Content-Type", "application/json") + rec := httptest.NewRecorder() + router.ServeHTTP(rec, req) + + if rec.Code != http.StatusConflict { + t.Fatalf("status = %d, want %d; body = %s", rec.Code, http.StatusConflict, rec.Body.String()) + } + }) +} + +func TestCancelScheduledOutbox(t *testing.T) { + svc := newOutboxFakeService() + router := newTestMailRouter(svc) + + t.Run("happy path", func(t *testing.T) { + svc.items["outbox-cancel-me"] = "scheduled" + + req := httptest.NewRequest(http.MethodPost, "/outbox/outbox-cancel-me/cancel", nil) + rec := httptest.NewRecorder() + router.ServeHTTP(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("status = %d, want %d; body = %s", rec.Code, http.StatusOK, rec.Body.String()) + } + + var resp map[string]string + if err := json.NewDecoder(rec.Body).Decode(&resp); err != nil { + t.Fatalf("decode body: %v", err) + } + if resp["status"] != "cancelled" { + t.Fatalf("response = %#v", resp) + } + }) + + t.Run("invalid status", func(t *testing.T) { + req := httptest.NewRequest(http.MethodPost, "/outbox/outbox-queued/cancel", nil) + rec := httptest.NewRecorder() + router.ServeHTTP(rec, req) + + if rec.Code != http.StatusConflict { + t.Fatalf("status = %d, want %d; body = %s", rec.Code, http.StatusConflict, rec.Body.String()) + } + }) + + t.Run("not found", func(t *testing.T) { + req := httptest.NewRequest(http.MethodPost, "/outbox/missing/cancel", nil) + rec := httptest.NewRecorder() + router.ServeHTTP(rec, req) + + if rec.Code != http.StatusNotFound { + t.Fatalf("status = %d, want %d; body = %s", rec.Code, http.StatusNotFound, rec.Body.String()) + } + }) +} diff --git a/internal/api/mail/outbox_scheduling.go b/internal/api/mail/outbox_scheduling.go new file mode 100644 index 0000000..8776cac --- /dev/null +++ b/internal/api/mail/outbox_scheduling.go @@ -0,0 +1,73 @@ +package mail + +import ( + "context" + "errors" + "time" + + "github.com/jackc/pgx/v5" +) + +var ErrInvalidOutboxStatus = errors.New("invalid outbox status") + +func (s *Service) SendOutboxNow(ctx context.Context, userID, outboxID string) (string, error) { + var status string + err := s.db.QueryRow(ctx, ` + UPDATE outbox SET status = 'queued', scheduled_at = NULL, updated_at = NOW() + WHERE id = $1 AND user_id = $2 AND status = 'scheduled' + RETURNING status + `, outboxID, userID).Scan(&status) + if err != nil { + if errors.Is(err, pgx.ErrNoRows) { + return "", s.outboxActionError(ctx, userID, outboxID) + } + return "", err + } + return status, nil +} + +func (s *Service) RescheduleOutbox(ctx context.Context, userID, outboxID string, scheduledAt time.Time) (string, error) { + var status string + err := s.db.QueryRow(ctx, ` + UPDATE outbox SET status = 'scheduled', scheduled_at = $3, updated_at = NOW() + WHERE id = $1 AND user_id = $2 AND status IN ('scheduled', 'queued') + RETURNING status + `, outboxID, userID, scheduledAt).Scan(&status) + if err != nil { + if errors.Is(err, pgx.ErrNoRows) { + return "", s.outboxActionError(ctx, userID, outboxID) + } + return "", err + } + return status, nil +} + +func (s *Service) CancelScheduledOutbox(ctx context.Context, userID, outboxID string) (string, error) { + var status string + err := s.db.QueryRow(ctx, ` + UPDATE outbox SET status = 'cancelled', scheduled_at = NULL, updated_at = NOW() + WHERE id = $1 AND user_id = $2 AND status = 'scheduled' + RETURNING status + `, outboxID, userID).Scan(&status) + if err != nil { + if errors.Is(err, pgx.ErrNoRows) { + return "", s.outboxActionError(ctx, userID, outboxID) + } + return "", err + } + return status, nil +} + +func (s *Service) outboxActionError(ctx context.Context, userID, outboxID string) error { + var status string + err := s.db.QueryRow(ctx, ` + SELECT status FROM outbox WHERE id = $1 AND user_id = $2 + `, outboxID, userID).Scan(&status) + if errors.Is(err, pgx.ErrNoRows) { + return ErrNotFound + } + if err != nil { + return err + } + return ErrInvalidOutboxStatus +} diff --git a/internal/api/mail/service_iface.go b/internal/api/mail/service_iface.go index 78d9cfa..271c9ad 100644 --- a/internal/api/mail/service_iface.go +++ b/internal/api/mail/service_iface.go @@ -3,6 +3,7 @@ package mail import ( "context" "io" + "time" "github.com/ultisuite/ulti-backend/internal/api/query" ) @@ -21,6 +22,9 @@ type ServiceAPI interface { DeleteMessage(ctx context.Context, externalID, messageID string) error GetThread(ctx context.Context, externalID, threadID string) (map[string]any, error) SendMessage(ctx context.Context, userID string, req *sendMessageRequest) (id, status string, err error) + SendOutboxNow(ctx context.Context, userID, outboxID string) (status string, err error) + RescheduleOutbox(ctx context.Context, userID, outboxID string, scheduledAt time.Time) (status string, err error) + CancelScheduledOutbox(ctx context.Context, userID, outboxID string) (status string, err error) ListDrafts(ctx context.Context, externalID string, params query.ListParams) (DraftsList, error) GetDraft(ctx context.Context, externalID, draftID string) (map[string]any, error) CreateDraft(ctx context.Context, userID string, req *draftRequest) (string, error) diff --git a/internal/api/mail/validate.go b/internal/api/mail/validate.go index 0cdf7f6..1538c11 100644 --- a/internal/api/mail/validate.go +++ b/internal/api/mail/validate.go @@ -8,6 +8,7 @@ import ( "net/url" "strconv" "strings" + "time" "unicode" "github.com/ultisuite/ulti-backend/internal/api/apivalidate" @@ -19,14 +20,15 @@ const ( maxWebhookRequestBody = 128 << 10 // 128 KiB maxRulesRequestBody = 256 << 10 // 256 KiB maxFlagsLabelsBody = 32 << 10 // 32 KiB + maxOutboxScheduleBody = 4 << 10 // 4 KiB maxWebhookBodyTemplate = 64 << 10 // 64 KiB maxWebhookHeaders = 20 maxHeaderNameLen = 256 maxHeaderValueLen = 8192 - maxSubjectLen = 998 - maxEmailLen = 320 + maxSubjectLen = 998 + maxEmailLen = 320 maxHostLen = 253 maxAccountName = 128 maxUsernameLen = 256 @@ -193,6 +195,25 @@ type sendMessageRequest struct { IdempotencyKey string `json:"-"` } +type rescheduleOutboxRequest struct { + ScheduleAt string `json:"schedule_at"` +} + +func validateRescheduleOutbox(req *rescheduleOutboxRequest) (*time.Time, *apivalidate.ValidationError) { + scheduleAt := strings.TrimSpace(req.ScheduleAt) + if scheduleAt == "" { + return nil, apivalidate.NewValidationError(apivalidate.FieldDetail{Field: "schedule_at", Message: "required"}) + } + parsed, err := time.Parse(time.RFC3339, scheduleAt) + if err != nil { + return nil, apivalidate.NewValidationError(apivalidate.FieldDetail{Field: "schedule_at", Message: "invalid"}) + } + if !parsed.After(time.Now()) { + return nil, apivalidate.NewValidationError(apivalidate.FieldDetail{Field: "schedule_at", Message: "must be in the future"}) + } + return &parsed, nil +} + func validateSendMessage(req *sendMessageRequest) *apivalidate.ValidationError { var details []apivalidate.FieldDetail if strings.TrimSpace(req.AccountID) == "" { diff --git a/internal/mail/smtp/attachments.go b/internal/mail/smtp/attachments.go new file mode 100644 index 0000000..8501e95 --- /dev/null +++ b/internal/mail/smtp/attachments.go @@ -0,0 +1,92 @@ +package smtp + +import ( + "context" + "encoding/json" + "fmt" + "io" + + "github.com/ultisuite/ulti-backend/internal/mail/storage" +) + +// SendAttachment is a MIME body part ready for embedding in an outbound message. +type SendAttachment struct { + Filename string + ContentType string + ContentID string + IsInline bool + Data []byte +} + +// AttachmentLoader fetches attachment bytes referenced by outbox JSON. +type AttachmentLoader interface { + Load(ctx context.Context, s3Key string) ([]byte, error) +} + +// StorageAttachmentLoader loads attachment bytes from object storage. +type StorageAttachmentLoader struct { + Client *storage.Client +} + +func (l *StorageAttachmentLoader) Load(ctx context.Context, s3Key string) ([]byte, error) { + if l == nil || l.Client == nil { + return nil, fmt.Errorf("object storage not configured") + } + obj, err := l.Client.Get(ctx, s3Key) + if err != nil { + return nil, err + } + defer obj.Close() + return io.ReadAll(obj) +} + +type outboxAttachmentRef struct { + ID string `json:"id"` + Filename string `json:"filename"` + ContentType string `json:"content_type"` + Size int64 `json:"size"` + S3Bucket string `json:"s3_bucket"` + S3Key string `json:"s3_key"` + ContentID string `json:"content_id,omitempty"` + IsInline bool `json:"is_inline"` +} + +func parseOutboxAttachmentsJSON(data []byte) ([]outboxAttachmentRef, error) { + if len(data) == 0 || string(data) == "[]" { + return nil, nil + } + var refs []outboxAttachmentRef + if err := json.Unmarshal(data, &refs); err != nil { + return nil, fmt.Errorf("parse outbox attachments: %w", err) + } + return refs, nil +} + +func resolveOutboxAttachments(ctx context.Context, loader AttachmentLoader, data []byte) ([]SendAttachment, error) { + refs, err := parseOutboxAttachmentsJSON(data) + if err != nil { + return nil, err + } + if len(refs) == 0 { + return nil, nil + } + if loader == nil { + return nil, fmt.Errorf("attachment loader not configured") + } + + attachments := make([]SendAttachment, 0, len(refs)) + for _, ref := range refs { + payload, err := loader.Load(ctx, ref.S3Key) + if err != nil { + return nil, fmt.Errorf("load attachment %q: %w", ref.Filename, err) + } + attachments = append(attachments, SendAttachment{ + Filename: ref.Filename, + ContentType: ref.ContentType, + ContentID: ref.ContentID, + IsInline: ref.IsInline, + Data: payload, + }) + } + return attachments, nil +} diff --git a/internal/mail/smtp/attachments_test.go b/internal/mail/smtp/attachments_test.go new file mode 100644 index 0000000..13227d4 --- /dev/null +++ b/internal/mail/smtp/attachments_test.go @@ -0,0 +1,92 @@ +package smtp + +import ( + "context" + "errors" + "reflect" + "testing" +) + +func TestParseOutboxAttachmentsJSON(t *testing.T) { + data := []byte(`[{ + "id":"att-1", + "filename":"doc.pdf", + "content_type":"application/pdf", + "size":12, + "s3_bucket":"mail-attachments", + "s3_key":"user/drafts/id/doc.pdf", + "content_id":"logo@ultimail", + "is_inline":true + }]`) + + refs, err := parseOutboxAttachmentsJSON(data) + if err != nil { + t.Fatalf("parseOutboxAttachmentsJSON() error = %v", err) + } + if len(refs) != 1 { + t.Fatalf("len(refs) = %d, want 1", len(refs)) + } + if refs[0].Filename != "doc.pdf" || !refs[0].IsInline || refs[0].ContentID != "logo@ultimail" { + t.Fatalf("unexpected ref: %+v", refs[0]) + } +} + +func TestParseOutboxAttachmentsJSON_empty(t *testing.T) { + for _, data := range [][]byte{nil, []byte{}, []byte("[]")} { + refs, err := parseOutboxAttachmentsJSON(data) + if err != nil { + t.Fatalf("parseOutboxAttachmentsJSON(%q) error = %v", data, err) + } + if refs != nil { + t.Fatalf("parseOutboxAttachmentsJSON(%q) = %v, want nil", data, refs) + } + } +} + +type stubAttachmentLoader struct { + data map[string][]byte + err error +} + +func (s stubAttachmentLoader) Load(_ context.Context, s3Key string) ([]byte, error) { + if s.err != nil { + return nil, s.err + } + payload, ok := s.data[s3Key] + if !ok { + return nil, errors.New("missing object") + } + return payload, nil +} + +func TestResolveOutboxAttachments(t *testing.T) { + data := []byte(`[{ + "filename":"note.txt", + "content_type":"text/plain", + "s3_key":"drafts/note.txt" + }]`) + loader := stubAttachmentLoader{data: map[string][]byte{ + "drafts/note.txt": []byte("hello"), + }} + + got, err := resolveOutboxAttachments(context.Background(), loader, data) + if err != nil { + t.Fatalf("resolveOutboxAttachments() error = %v", err) + } + want := []SendAttachment{{ + Filename: "note.txt", + ContentType: "text/plain", + Data: []byte("hello"), + }} + if !reflect.DeepEqual(got, want) { + t.Fatalf("resolveOutboxAttachments() = %+v, want %+v", got, want) + } +} + +func TestResolveOutboxAttachments_requiresLoader(t *testing.T) { + data := []byte(`[{"filename":"note.txt","s3_key":"drafts/note.txt"}]`) + _, err := resolveOutboxAttachments(context.Background(), nil, data) + if err == nil { + t.Fatal("expected error when loader is nil") + } +} diff --git a/internal/mail/smtp/mime.go b/internal/mail/smtp/mime.go new file mode 100644 index 0000000..740fa90 --- /dev/null +++ b/internal/mail/smtp/mime.go @@ -0,0 +1,187 @@ +package smtp + +import ( + "bytes" + "encoding/base64" + "fmt" + "io" + "mime" + "mime/multipart" + "net/textproto" + "strings" + "time" +) + +func buildMessage(req *SendRequest) string { + if len(req.Attachments) == 0 { + return buildMessageWithoutAttachments(req) + } + return buildMessageWithAttachments(req) +} + +func buildMessageWithoutAttachments(req *SendRequest) string { + var b strings.Builder + writeCommonHeaders(&b, req) + + if req.BodyHTML != "" { + boundary := fmt.Sprintf("----=_Part_%d", time.Now().UnixNano()) + b.WriteString(fmt.Sprintf("Content-Type: multipart/alternative; boundary=\"%s\"\r\n", boundary)) + b.WriteString("\r\n") + b.WriteString(fmt.Sprintf("--%s\r\n", boundary)) + b.WriteString("Content-Type: text/plain; charset=UTF-8\r\n\r\n") + b.WriteString(req.BodyText) + b.WriteString(fmt.Sprintf("\r\n--%s\r\n", boundary)) + b.WriteString("Content-Type: text/html; charset=UTF-8\r\n\r\n") + b.WriteString(req.BodyHTML) + b.WriteString(fmt.Sprintf("\r\n--%s--\r\n", boundary)) + } else { + b.WriteString("Content-Type: text/plain; charset=UTF-8\r\n\r\n") + b.WriteString(req.BodyText) + } + + return b.String() +} + +func buildMessageWithAttachments(req *SendRequest) string { + body := &bytes.Buffer{} + mixed := multipart.NewWriter(body) + + _ = writeMessageBodyPart(mixed, req.BodyText, req.BodyHTML) + for _, att := range req.Attachments { + _ = writeAttachmentPart(mixed, att) + } + _ = mixed.Close() + + var head strings.Builder + writeCommonHeaders(&head, req) + head.WriteString(fmt.Sprintf("Content-Type: multipart/mixed; boundary=\"%s\"\r\n\r\n", mixed.Boundary())) + head.Write(body.Bytes()) + return head.String() +} + +func writeCommonHeaders(b *strings.Builder, req *SendRequest) { + b.WriteString(fmt.Sprintf("From: %s\r\n", req.From)) + b.WriteString(fmt.Sprintf("To: %s\r\n", strings.Join(req.To, ", "))) + if len(req.Cc) > 0 { + b.WriteString(fmt.Sprintf("Cc: %s\r\n", strings.Join(req.Cc, ", "))) + } + b.WriteString(fmt.Sprintf("Subject: %s\r\n", req.Subject)) + b.WriteString(fmt.Sprintf("Date: %s\r\n", time.Now().Format(time.RFC1123Z))) + b.WriteString(fmt.Sprintf("Message-ID: %s\r\n", generateMessageID(req.From))) + b.WriteString("MIME-Version: 1.0\r\n") + + if req.InReplyTo != "" { + b.WriteString(fmt.Sprintf("In-Reply-To: %s\r\n", req.InReplyTo)) + } + if len(req.References) > 0 { + b.WriteString(fmt.Sprintf("References: %s\r\n", strings.Join(req.References, " "))) + } +} + +func writeMessageBodyPart(w *multipart.Writer, text, html string) error { + if html != "" { + altBuf := &bytes.Buffer{} + alt := multipart.NewWriter(altBuf) + + if err := writeTextPart(alt, "text/plain; charset=UTF-8", text); err != nil { + return err + } + if err := writeTextPart(alt, "text/html; charset=UTF-8", html); err != nil { + return err + } + if err := alt.Close(); err != nil { + return err + } + + h := textproto.MIMEHeader{} + h.Set("Content-Type", fmt.Sprintf("multipart/alternative; boundary=\"%s\"", alt.Boundary())) + part, err := w.CreatePart(h) + if err != nil { + return err + } + _, err = part.Write(altBuf.Bytes()) + return err + } + + h := textproto.MIMEHeader{} + h.Set("Content-Type", "text/plain; charset=UTF-8") + part, err := w.CreatePart(h) + if err != nil { + return err + } + _, err = part.Write([]byte(text)) + return err +} + +func writeTextPart(w *multipart.Writer, contentType, body string) error { + h := textproto.MIMEHeader{} + h.Set("Content-Type", contentType) + part, err := w.CreatePart(h) + if err != nil { + return err + } + _, err = part.Write([]byte(body)) + return err +} + +func writeAttachmentPart(w *multipart.Writer, att SendAttachment) error { + contentType := att.ContentType + if contentType == "" { + contentType = "application/octet-stream" + } + + h := textproto.MIMEHeader{} + if att.Filename != "" { + h.Set("Content-Type", mime.FormatMediaType(contentType, map[string]string{ + "name": att.Filename, + })) + } else { + h.Set("Content-Type", contentType) + } + h.Set("Content-Transfer-Encoding", "base64") + + disposition := "attachment" + if att.IsInline { + disposition = "inline" + } + if att.Filename != "" { + h.Set("Content-Disposition", mime.FormatMediaType(disposition, map[string]string{ + "filename": att.Filename, + })) + } else { + h.Set("Content-Disposition", disposition) + } + + if att.IsInline && att.ContentID != "" { + cid := att.ContentID + if !strings.HasPrefix(cid, "<") { + cid = "<" + cid + ">" + } + h.Set("Content-ID", cid) + } + + part, err := w.CreatePart(h) + if err != nil { + return err + } + return writeBase64(part, att.Data) +} + +func writeBase64(w io.Writer, data []byte) error { + encoded := base64.StdEncoding.EncodeToString(data) + for i := 0; i < len(encoded); i += 76 { + end := i + 76 + if end > len(encoded) { + end = len(encoded) + } + if _, err := io.WriteString(w, encoded[i:end]); err != nil { + return err + } + if end < len(encoded) { + if _, err := io.WriteString(w, "\r\n"); err != nil { + return err + } + } + } + return nil +} diff --git a/internal/mail/smtp/mime_test.go b/internal/mail/smtp/mime_test.go new file mode 100644 index 0000000..5e37e9c --- /dev/null +++ b/internal/mail/smtp/mime_test.go @@ -0,0 +1,267 @@ +package smtp + +import ( + "bytes" + "encoding/base64" + "io" + "mime" + "mime/multipart" + "net/mail" + "strings" + "testing" +) + +func TestBuildMessage_plainOnly(t *testing.T) { + msg := buildMessage(&SendRequest{ + From: "alice@example.com", + To: []string{"bob@example.com"}, + Subject: "Hello", + BodyText: "Plain body", + }) + + if strings.Contains(msg, "multipart/") { + t.Fatal("plain-only message must not be multipart") + } + if !strings.Contains(msg, "Content-Type: text/plain; charset=UTF-8") { + t.Fatal("missing text/plain content type") + } + if !strings.Contains(msg, "Plain body") { + t.Fatal("missing body text") + } +} + +func TestBuildMessage_alternativeOnly(t *testing.T) { + msg := buildMessage(&SendRequest{ + From: "alice@example.com", + To: []string{"bob@example.com"}, + Subject: "Hello", + BodyText: "Plain body", + BodyHTML: "

HTML body

", + }) + + if !strings.Contains(msg, "Content-Type: multipart/alternative") { + t.Fatal("missing multipart/alternative top-level content type") + } + if strings.Contains(msg, "multipart/mixed") { + t.Fatal("alternative-only message must not be multipart/mixed") + } + if !strings.Contains(msg, "Content-Type: text/html; charset=UTF-8") { + t.Fatal("missing text/html part") + } +} + +func TestBuildMessage_mixedWithPlainAttachment(t *testing.T) { + payload := []byte("file-bytes") + msg := buildMessage(&SendRequest{ + From: "alice@example.com", + To: []string{"bob@example.com"}, + Subject: "With file", + BodyText: "See attached", + Attachments: []SendAttachment{{ + Filename: "doc.pdf", + ContentType: "application/pdf", + Data: payload, + }}, + }) + + parsed, err := mail.ReadMessage(strings.NewReader(msg)) + if err != nil { + t.Fatalf("ReadMessage: %v", err) + } + + mediaType, params, err := mime.ParseMediaType(parsed.Header.Get("Content-Type")) + if err != nil { + t.Fatalf("ParseMediaType: %v", err) + } + if mediaType != "multipart/mixed" { + t.Fatalf("top-level content type = %q, want multipart/mixed", mediaType) + } + + parts := readAllParts(t, parsed.Body, params["boundary"]) + if len(parts) != 2 { + t.Fatalf("part count = %d, want 2", len(parts)) + } + + if got := parts[0].Header.Get("Content-Type"); got != "text/plain; charset=UTF-8" { + t.Fatalf("body part content type = %q", got) + } + if string(parts[0].Body) != "See attached" { + t.Fatalf("body part = %q", parts[0].Body) + } + + attType, attParams, err := mime.ParseMediaType(parts[1].Header.Get("Content-Type")) + if err != nil { + t.Fatalf("ParseMediaType attachment: %v", err) + } + if attType != "application/pdf" { + t.Fatalf("attachment content type = %q", attType) + } + if attParams["name"] != "doc.pdf" { + t.Fatalf("attachment name = %q", attParams["name"]) + } + if parts[1].Header.Get("Content-Transfer-Encoding") != "base64" { + t.Fatal("attachment missing base64 transfer encoding") + } + disp, dispParams, err := mime.ParseMediaType(parts[1].Header.Get("Content-Disposition")) + if err != nil { + t.Fatalf("ParseMediaType disposition: %v", err) + } + if disp != "attachment" { + t.Fatalf("disposition = %q, want attachment", disp) + } + if dispParams["filename"] != "doc.pdf" { + t.Fatalf("filename = %q", dispParams["filename"]) + } + if !bytes.Equal(decodeBase64Part(t, parts[1].Body), payload) { + t.Fatal("attachment payload mismatch") + } +} + +func TestBuildMessage_mixedWithAlternativeAndAttachment(t *testing.T) { + msg := buildMessage(&SendRequest{ + From: "alice@example.com", + To: []string{"bob@example.com"}, + Subject: "Rich mail", + BodyText: "Plain", + BodyHTML: "HTML", + Attachments: []SendAttachment{{ + Filename: "note.txt", + ContentType: "text/plain", + Data: []byte("attachment"), + }}, + }) + + parsed, err := mail.ReadMessage(strings.NewReader(msg)) + if err != nil { + t.Fatalf("ReadMessage: %v", err) + } + + mediaType, params, err := mime.ParseMediaType(parsed.Header.Get("Content-Type")) + if err != nil { + t.Fatalf("ParseMediaType: %v", err) + } + if mediaType != "multipart/mixed" { + t.Fatalf("top-level content type = %q, want multipart/mixed", mediaType) + } + + parts := readAllParts(t, parsed.Body, params["boundary"]) + if len(parts) != 2 { + t.Fatalf("part count = %d, want 2", len(parts)) + } + + altType, altParams, err := mime.ParseMediaType(parts[0].Header.Get("Content-Type")) + if err != nil { + t.Fatalf("ParseMediaType alternative: %v", err) + } + if altType != "multipart/alternative" { + t.Fatalf("first part type = %q, want multipart/alternative", altType) + } + + altParts := readAllParts(t, bytes.NewReader(parts[0].Body), altParams["boundary"]) + if len(altParts) != 2 { + t.Fatalf("alternative part count = %d, want 2", len(altParts)) + } + if string(altParts[0].Body) != "Plain" { + t.Fatalf("plain part = %q", altParts[0].Body) + } + if string(altParts[1].Body) != "HTML" { + t.Fatalf("html part = %q", altParts[1].Body) + } + + disp, _, err := mime.ParseMediaType(parts[1].Header.Get("Content-Disposition")) + if err != nil { + t.Fatalf("ParseMediaType disposition: %v", err) + } + if disp != "attachment" { + t.Fatalf("attachment disposition = %q", disp) + } +} + +func TestBuildMessage_inlineAttachmentHeaders(t *testing.T) { + msg := buildMessage(&SendRequest{ + From: "alice@example.com", + To: []string{"bob@example.com"}, + Subject: "Inline", + BodyHTML: ``, + BodyText: "Logo", + Attachments: []SendAttachment{{ + Filename: "logo.png", + ContentType: "image/png", + ContentID: "logo@ultimail", + IsInline: true, + Data: []byte{0x89, 0x50, 0x4e, 0x47}, + }}, + }) + + parsed, err := mail.ReadMessage(strings.NewReader(msg)) + if err != nil { + t.Fatalf("ReadMessage: %v", err) + } + + _, params, err := mime.ParseMediaType(parsed.Header.Get("Content-Type")) + if err != nil { + t.Fatalf("ParseMediaType: %v", err) + } + + parts := readAllParts(t, parsed.Body, params["boundary"]) + if len(parts) != 2 { + t.Fatalf("part count = %d, want 2", len(parts)) + } + + inline := parts[1] + disp, _, err := mime.ParseMediaType(inline.Header.Get("Content-Disposition")) + if err != nil { + t.Fatalf("ParseMediaType disposition: %v", err) + } + if disp != "inline" { + t.Fatalf("disposition = %q, want inline", disp) + } + if inline.Header.Get("Content-ID") != "" { + t.Fatalf("content-id = %q", inline.Header.Get("Content-ID")) + } +} + +type mimePart struct { + Header mail.Header + Body []byte +} + +func readAllParts(t *testing.T, r io.Reader, boundary string) []mimePart { + t.Helper() + + mr := multipart.NewReader(r, boundary) + var parts []mimePart + for { + part, err := mr.NextPart() + if err == io.EOF { + break + } + if err != nil { + t.Fatalf("NextPart: %v", err) + } + body, err := io.ReadAll(part) + if err != nil { + t.Fatalf("ReadAll part: %v", err) + } + parts = append(parts, mimePart{ + Header: mail.Header(part.Header), + Body: body, + }) + } + return parts +} + +func decodeBase64Part(t *testing.T, raw []byte) []byte { + t.Helper() + clean := strings.Map(func(r rune) rune { + if r == '\r' || r == '\n' { + return -1 + } + return r + }, string(raw)) + out, err := base64.StdEncoding.DecodeString(clean) + if err != nil { + t.Fatalf("DecodeString: %v", err) + } + return out +} diff --git a/internal/mail/smtp/outbox.go b/internal/mail/smtp/outbox.go index 929ecc9..3cad3a4 100644 --- a/internal/mail/smtp/outbox.go +++ b/internal/mail/smtp/outbox.go @@ -3,9 +3,13 @@ package smtp import ( "context" "encoding/json" + "errors" + "fmt" "log/slog" + "strings" "time" + "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" "github.com/ultisuite/ulti-backend/internal/observability" ) @@ -15,24 +19,37 @@ type OutboxSender interface { } type OutboxProcessor struct { - db *pgxpool.Pool - sender OutboxSender - logger *slog.Logger - interval time.Duration - maxRetries int + db *pgxpool.Pool + sender OutboxSender + attachmentLoader AttachmentLoader + logger *slog.Logger + interval time.Duration + maxRetries int } -func NewOutboxProcessor(db *pgxpool.Pool, sender OutboxSender, interval time.Duration, maxRetries int) *OutboxProcessor { +type OutboxOption func(*OutboxProcessor) + +func WithAttachmentLoader(loader AttachmentLoader) OutboxOption { + return func(p *OutboxProcessor) { + p.attachmentLoader = loader + } +} + +func NewOutboxProcessor(db *pgxpool.Pool, sender OutboxSender, interval time.Duration, maxRetries int, opts ...OutboxOption) *OutboxProcessor { if maxRetries < 1 { maxRetries = DefaultMaxOutboxRetries } - return &OutboxProcessor{ + p := &OutboxProcessor{ db: db, sender: sender, logger: slog.Default().With("component", "outbox"), interval: interval, maxRetries: maxRetries, } + for _, opt := range opts { + opt(p) + } + return p } func (p *OutboxProcessor) Start(ctx context.Context) { @@ -65,7 +82,7 @@ func (p *OutboxProcessor) processQueue(ctx context.Context) { LIMIT 10 FOR UPDATE SKIP LOCKED ) - RETURNING id, account_id, to_addrs, cc_addrs, bcc_addrs, subject, body_text, body_html, in_reply_to, references_header, retry_count + RETURNING id, account_id, to_addrs, cc_addrs, bcc_addrs, subject, body_text, body_html, in_reply_to, references_header, attachments, retry_count `) if err != nil { p.logger.Error("failed to query outbox", "error", err) @@ -75,20 +92,21 @@ func (p *OutboxProcessor) processQueue(ctx context.Context) { for rows.Next() { var ( - id string - accountID string - toJSON []byte - ccJSON []byte - bccJSON []byte - subject string - bodyText string - bodyHTML string - inReplyTo string - references []string - retryCount int + id string + accountID string + toJSON []byte + ccJSON []byte + bccJSON []byte + subject string + bodyText string + bodyHTML string + inReplyTo string + references []string + attachmentsJSON []byte + retryCount int ) - if err := rows.Scan(&id, &accountID, &toJSON, &ccJSON, &bccJSON, &subject, &bodyText, &bodyHTML, &inReplyTo, &references, &retryCount); err != nil { + if err := rows.Scan(&id, &accountID, &toJSON, &ccJSON, &bccJSON, &subject, &bodyText, &bodyHTML, &inReplyTo, &references, &attachmentsJSON, &retryCount); err != nil { p.logger.Error("scan outbox row", "error", err) continue } @@ -107,50 +125,41 @@ func (p *OutboxProcessor) processQueue(ctx context.Context) { `, accountID).Scan(&fromEmail); err != nil || fromEmail == "" { if err := p.db.QueryRow(ctx, `SELECT email FROM mail_accounts WHERE id = $1`, accountID).Scan(&fromEmail); err != nil { p.logger.Error("resolve from address", "outbox_id", id, "account_id", accountID, "error", err) + p.markSendFailure(ctx, id, retryCount, fmt.Errorf("resolve from address: %w", err)) continue } } + attachments, err := resolveOutboxAttachments(ctx, p.attachmentLoader, attachmentsJSON) + if err != nil { + p.logger.Error("resolve outbox attachments", "outbox_id", id, "error", err) + p.markSendFailure(ctx, id, retryCount, err) + continue + } + req := &SendRequest{ - AccountID: accountID, - From: fromEmail, - To: to, - Cc: cc, - Bcc: bcc, - Subject: subject, - BodyText: bodyText, - BodyHTML: bodyHTML, - InReplyTo: inReplyTo, - References: references, + AccountID: accountID, + From: fromEmail, + To: to, + Cc: cc, + Bcc: bcc, + Subject: subject, + BodyText: bodyText, + BodyHTML: bodyHTML, + InReplyTo: inReplyTo, + References: references, + Attachments: attachments, } if err := p.sender.Send(ctx, req); err != nil { p.logger.Error("send failed", "outbox_id", id, "error", err) observability.IncOutboxProcessed("error") - nextRetry := time.Now().Add(OutboxRetryDelay(retryCount)) - newRetry := retryCount + 1 - status := "queued" - if newRetry >= p.maxRetries { - status = "failed" - } - if _, execErr := p.db.Exec(ctx, ` - UPDATE outbox SET - status = $2, - retry_count = $3, - next_retry_at = $4, - error = $5, - updated_at = NOW() - WHERE id = $1 - `, id, status, newRetry, nextRetry, err.Error()); execErr != nil { - p.logger.Error("failed to mark outbox retry", "outbox_id", id, "error", execErr) - } + p.markSendFailure(ctx, id, retryCount, err) } else { observability.IncOutboxProcessed("success") - if _, execErr := p.db.Exec(ctx, ` - UPDATE outbox SET status = 'sent', sent_at = NOW(), updated_at = NOW() - WHERE id = $1 - `, id); execErr != nil { - p.logger.Error("failed to mark outbox sent", "outbox_id", id, "error", execErr) + if err := p.persistSentCopyAndMarkSent(ctx, id, accountID, req, attachmentsJSON); err != nil { + p.logger.Error("persist sent copy failed", "outbox_id", id, "error", err) + p.markSentOnly(ctx, id) } } } @@ -186,6 +195,183 @@ func (p *OutboxProcessor) updateQueueDepth(ctx context.Context) { observability.SetOutboxQueueDepth(count) } +func (p *OutboxProcessor) markSendFailure(ctx context.Context, outboxID string, retryCount int, sendErr error) { + newRetry := retryCount + 1 + status := "queued" + var nextRetry any = time.Now().Add(OutboxRetryDelay(retryCount)) + if newRetry >= p.maxRetries { + status = "failed" + nextRetry = nil + if err := p.recordDeadLetter(ctx, outboxID, newRetry, sendErr); err != nil { + p.logger.Error("failed to write dead-letter entry", "outbox_id", outboxID, "error", err) + } + } + if _, err := p.db.Exec(ctx, ` + UPDATE outbox SET + status = $2, + retry_count = $3, + next_retry_at = $4, + error = $5, + updated_at = NOW() + WHERE id = $1 + `, outboxID, status, newRetry, nextRetry, sendErr.Error()); err != nil { + p.logger.Error("failed to mark outbox retry", "outbox_id", outboxID, "error", err) + } +} + +func (p *OutboxProcessor) recordDeadLetter(ctx context.Context, outboxID string, attempt int, sendErr error) error { + _, err := p.db.Exec(ctx, ` + INSERT INTO outbox_dead_letters (outbox_id, attempt_count, error) + VALUES ($1, $2, $3) + `, outboxID, attempt, sendErr.Error()) + return err +} + +func (p *OutboxProcessor) markSentOnly(ctx context.Context, outboxID string) { + if _, err := p.db.Exec(ctx, ` + UPDATE outbox SET status = 'sent', sent_at = NOW(), error = '', updated_at = NOW() + WHERE id = $1 + `, outboxID); err != nil { + p.logger.Error("failed to mark outbox sent", "outbox_id", outboxID, "error", err) + } +} + +func (p *OutboxProcessor) persistSentCopyAndMarkSent( + ctx context.Context, + outboxID, accountID string, + req *SendRequest, + attachmentsJSON []byte, +) error { + tx, err := p.db.BeginTx(ctx, pgx.TxOptions{}) + if err != nil { + return err + } + defer tx.Rollback(ctx) + + folderID, err := ensureSentFolder(ctx, tx, accountID) + if err != nil { + return err + } + + uid, err := nextSyntheticUID(ctx, tx, folderID) + if err != nil { + return err + } + + now := time.Now() + fromJSON := marshalAddressesJSON([]string{req.From}) + toJSON := marshalAddressesJSON(req.To) + ccJSON := marshalAddressesJSON(req.Cc) + bccJSON := marshalAddressesJSON(req.Bcc) + snippet := snippetFromBodies(req.BodyText, req.BodyHTML) + msgID := generateMessageID(req.From) + + var messageRowID string + err = tx.QueryRow(ctx, ` + INSERT INTO messages ( + account_id, folder_id, uid, message_id, subject, + from_addr, to_addrs, cc_addrs, bcc_addrs, + date, snippet, body_text, body_html, has_attachments, + in_reply_to, references_header + ) + VALUES ($1, $2, $3, $4, $5, $6::jsonb, $7::jsonb, $8::jsonb, $9::jsonb, $10, $11, $12, $13, $14, $15, $16) + RETURNING id + `, accountID, folderID, uid, msgID, req.Subject, fromJSON, toJSON, ccJSON, bccJSON, + now, snippet, req.BodyText, req.BodyHTML, len(req.Attachments) > 0, req.InReplyTo, req.References, + ).Scan(&messageRowID) + if err != nil { + return err + } + + refs, err := parseOutboxAttachmentsJSON(attachmentsJSON) + if err != nil { + return err + } + for _, ref := range refs { + if _, err := tx.Exec(ctx, ` + INSERT INTO attachments ( + message_id, filename, content_type, size, s3_bucket, s3_key, content_id, is_inline + ) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + `, messageRowID, ref.Filename, ref.ContentType, ref.Size, ref.S3Bucket, ref.S3Key, ref.ContentID, ref.IsInline); err != nil { + return err + } + } + + if _, err := tx.Exec(ctx, ` + UPDATE outbox SET status = 'sent', sent_at = NOW(), error = '', updated_at = NOW() + WHERE id = $1 + `, outboxID); err != nil { + return err + } + return tx.Commit(ctx) +} + +func ensureSentFolder(ctx context.Context, tx pgx.Tx, accountID string) (string, error) { + var folderID string + err := tx.QueryRow(ctx, ` + SELECT id + FROM mail_folders + WHERE account_id = $1 + AND (folder_type = 'sent' OR lower(name) = 'sent' OR lower(remote_name) = 'sent') + ORDER BY created_at ASC + LIMIT 1 + `, accountID).Scan(&folderID) + if err == nil { + return folderID, nil + } + if !errors.Is(err, pgx.ErrNoRows) { + return "", err + } + err = tx.QueryRow(ctx, ` + INSERT INTO mail_folders (account_id, name, remote_name, folder_type, created_at, updated_at) + VALUES ($1, 'Sent', 'Sent', 'sent', NOW(), NOW()) + ON CONFLICT (account_id, remote_name) DO UPDATE SET updated_at = NOW() + RETURNING id + `, accountID).Scan(&folderID) + if err != nil { + return "", err + } + return folderID, nil +} + +func nextSyntheticUID(ctx context.Context, tx pgx.Tx, folderID string) (int64, error) { + var uid int64 + err := tx.QueryRow(ctx, ` + SELECT COALESCE(MAX(uid), 0) + 1 + FROM messages + WHERE folder_id = $1 + `, folderID).Scan(&uid) + return uid, err +} + +func snippetFromBodies(bodyText, bodyHTML string) string { + text := strings.TrimSpace(bodyText) + if text == "" { + text = strings.TrimSpace(bodyHTML) + } + if len(text) > 200 { + return text[:200] + } + return text +} + +func marshalAddressesJSON(addrs []string) []byte { + items := make([]map[string]string, 0, len(addrs)) + for _, addr := range addrs { + trimmed := strings.TrimSpace(addr) + if trimmed == "" { + continue + } + items = append(items, map[string]string{"address": trimmed}) + } + payload, err := json.Marshal(items) + if err != nil { + return []byte("[]") + } + return payload +} + func parseJSONAddresses(data []byte) []string { var addrs []struct { Address string `json:"address"` diff --git a/internal/mail/smtp/sender.go b/internal/mail/smtp/sender.go index 74c10ec..42e2d6d 100644 --- a/internal/mail/smtp/sender.go +++ b/internal/mail/smtp/sender.go @@ -31,16 +31,17 @@ func NewSender(db *pgxpool.Pool, credManager *credentials.Manager) *Sender { } type SendRequest struct { - AccountID string - From string - To []string - Cc []string - Bcc []string - Subject string - BodyText string - BodyHTML string - InReplyTo string - References []string + AccountID string + From string + To []string + Cc []string + Bcc []string + Subject string + BodyText string + BodyHTML string + InReplyTo string + References []string + Attachments []SendAttachment } func (s *Sender) Send(ctx context.Context, req *SendRequest) error { @@ -87,45 +88,6 @@ func (s *Sender) Send(ctx context.Context, req *SendRequest) error { return nil } -func buildMessage(req *SendRequest) string { - var b strings.Builder - - b.WriteString(fmt.Sprintf("From: %s\r\n", req.From)) - b.WriteString(fmt.Sprintf("To: %s\r\n", strings.Join(req.To, ", "))) - if len(req.Cc) > 0 { - b.WriteString(fmt.Sprintf("Cc: %s\r\n", strings.Join(req.Cc, ", "))) - } - b.WriteString(fmt.Sprintf("Subject: %s\r\n", req.Subject)) - b.WriteString(fmt.Sprintf("Date: %s\r\n", time.Now().Format(time.RFC1123Z))) - b.WriteString(fmt.Sprintf("Message-ID: %s\r\n", generateMessageID(req.From))) - b.WriteString("MIME-Version: 1.0\r\n") - - if req.InReplyTo != "" { - b.WriteString(fmt.Sprintf("In-Reply-To: %s\r\n", req.InReplyTo)) - } - if len(req.References) > 0 { - b.WriteString(fmt.Sprintf("References: %s\r\n", strings.Join(req.References, " "))) - } - - if req.BodyHTML != "" { - boundary := fmt.Sprintf("----=_Part_%d", time.Now().UnixNano()) - b.WriteString(fmt.Sprintf("Content-Type: multipart/alternative; boundary=\"%s\"\r\n", boundary)) - b.WriteString("\r\n") - b.WriteString(fmt.Sprintf("--%s\r\n", boundary)) - b.WriteString("Content-Type: text/plain; charset=UTF-8\r\n\r\n") - b.WriteString(req.BodyText) - b.WriteString(fmt.Sprintf("\r\n--%s\r\n", boundary)) - b.WriteString("Content-Type: text/html; charset=UTF-8\r\n\r\n") - b.WriteString(req.BodyHTML) - b.WriteString(fmt.Sprintf("\r\n--%s--\r\n", boundary)) - } else { - b.WriteString("Content-Type: text/plain; charset=UTF-8\r\n\r\n") - b.WriteString(req.BodyText) - } - - return b.String() -} - func generateMessageID(from string) string { domain := "ultimail.local" if i := strings.LastIndex(from, "@"); i >= 0 && i < len(from)-1 { diff --git a/migrations/000010_outbox_status_dead_letter.down.sql b/migrations/000010_outbox_status_dead_letter.down.sql new file mode 100644 index 0000000..468138d --- /dev/null +++ b/migrations/000010_outbox_status_dead_letter.down.sql @@ -0,0 +1,5 @@ +DROP INDEX IF EXISTS idx_outbox_dead_letters_outbox; +DROP TABLE IF EXISTS outbox_dead_letters; + +ALTER TABLE outbox + DROP CONSTRAINT IF EXISTS outbox_status_valid_chk; diff --git a/migrations/000010_outbox_status_dead_letter.up.sql b/migrations/000010_outbox_status_dead_letter.up.sql new file mode 100644 index 0000000..ac90d94 --- /dev/null +++ b/migrations/000010_outbox_status_dead_letter.up.sql @@ -0,0 +1,17 @@ +ALTER TABLE outbox + DROP CONSTRAINT IF EXISTS outbox_status_valid_chk; + +ALTER TABLE outbox + ADD CONSTRAINT outbox_status_valid_chk + CHECK (status IN ('draft', 'queued', 'scheduled', 'sending', 'sent', 'failed', 'cancelled')) NOT VALID; + +CREATE TABLE IF NOT EXISTS outbox_dead_letters ( + id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), + outbox_id UUID NOT NULL REFERENCES outbox(id) ON DELETE CASCADE, + attempt_count INT NOT NULL DEFAULT 0, + error TEXT NOT NULL DEFAULT '', + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS idx_outbox_dead_letters_outbox + ON outbox_dead_letters(outbox_id, created_at DESC); diff --git a/project-plan/checklist-execution.md b/project-plan/checklist-execution.md index 7ae8c57..e6652ff 100644 --- a/project-plan/checklist-execution.md +++ b/project-plan/checklist-execution.md @@ -103,11 +103,11 @@ Objectif: transformer état actuel (partiellement implémenté) vers produit fon ### 2.3 SMTP / Outbox / Scheduling -- [ ] Normaliser statuts outbox (`draft`, `queued`, `scheduled`, `sending`, `sent`, `failed`, `cancelled`). -- [ ] Implémenter retries exponentiels + dead-letter strategy. -- [ ] Implémenter "send now", "reschedule", "cancel scheduled". -- [ ] Écrire message envoyé dans dossier Sent (sync cohérente UI). -- [ ] Gérer inline attachments MIME multipart mixed/alternative. +- [x] Normaliser statuts outbox (`draft`, `queued`, `scheduled`, `sending`, `sent`, `failed`, `cancelled`). +- [x] Implémenter retries exponentiels + dead-letter strategy. +- [x] Implémenter "send now", "reschedule", "cancel scheduled". +- [x] Écrire message envoyé dans dossier Sent (sync cohérente UI). +- [x] Gérer inline attachments MIME multipart mixed/alternative. ### 2.4 Rules & Webhooks