Implement outbox management features with scheduling and attachment support

- Added new API endpoints for sending, rescheduling, and canceling scheduled outbox messages.
- Implemented outbox processing logic to handle attachments and manage message statuses.
- Introduced a dead-letter strategy for failed outbox messages, enhancing reliability.
- Updated database schema to support new outbox statuses and dead-letter entries.
- Enhanced unit tests for outbox functionalities, ensuring robust error handling and validation.
- Improved attachment handling in the outbox processor to support inline and regular attachments.
This commit is contained in:
R3D347HR4Y 2026-05-22 17:46:30 +02:00
parent bb5be669c1
commit 65fc9e517a
16 changed files with 1351 additions and 110 deletions

View File

@ -151,7 +151,13 @@ func main() {
sender := smtp.NewSender(pool, credentialManager) sender := smtp.NewSender(pool, credentialManager)
smtpCircuit := smtp.NewCircuitBreaker(cfg.MailSMTPCircuitFailures, cfg.MailSMTPCircuitCooldown) smtpCircuit := smtp.NewCircuitBreaker(cfg.MailSMTPCircuitFailures, cfg.MailSMTPCircuitCooldown)
guardedSender := smtp.NewGuardedSender(sender, smtpCircuit) guardedSender := smtp.NewGuardedSender(sender, smtpCircuit)
go smtp.NewOutboxProcessor(pool, guardedSender, cfg.MailOutboxInterval, cfg.MailOutboxMaxRetries).Start(ctx) go smtp.NewOutboxProcessor(
pool,
guardedSender,
cfg.MailOutboxInterval,
cfg.MailOutboxMaxRetries,
smtp.WithAttachmentLoader(&smtp.StorageAttachmentLoader{Client: attachmentStorage}),
).Start(ctx)
sendRateLimiter := sendguard.NewRateLimiter(cfg.MailSendRatePerMinute, cfg.MailSendBurst) sendRateLimiter := sendguard.NewRateLimiter(cfg.MailSendRatePerMinute, cfg.MailSendBurst)

View File

@ -10,11 +10,11 @@ import (
"github.com/ultisuite/ulti-backend/internal/api/apiresponse" "github.com/ultisuite/ulti-backend/internal/api/apiresponse"
"github.com/ultisuite/ulti-backend/internal/api/apivalidate" "github.com/ultisuite/ulti-backend/internal/api/apivalidate"
"github.com/ultisuite/ulti-backend/internal/api/mail/sendguard"
"github.com/ultisuite/ulti-backend/internal/api/middleware" "github.com/ultisuite/ulti-backend/internal/api/middleware"
"github.com/ultisuite/ulti-backend/internal/api/query" "github.com/ultisuite/ulti-backend/internal/api/query"
"github.com/ultisuite/ulti-backend/internal/mail/credentials" "github.com/ultisuite/ulti-backend/internal/mail/credentials"
"github.com/ultisuite/ulti-backend/internal/mail/limits" "github.com/ultisuite/ulti-backend/internal/mail/limits"
"github.com/ultisuite/ulti-backend/internal/api/mail/sendguard"
"github.com/ultisuite/ulti-backend/internal/mail/storage" "github.com/ultisuite/ulti-backend/internal/mail/storage"
"github.com/ultisuite/ulti-backend/internal/securityaudit" "github.com/ultisuite/ulti-backend/internal/securityaudit"
) )
@ -88,6 +88,10 @@ func (h *Handler) Routes() chi.Router {
r.Post("/send", h.SendMessage) r.Post("/send", h.SendMessage)
r.Post("/outbox/{outboxID}/send-now", h.SendOutboxNow)
r.Post("/outbox/{outboxID}/reschedule", h.RescheduleOutbox)
r.Post("/outbox/{outboxID}/cancel", h.CancelScheduledOutbox)
r.Get("/rules", h.ListRules) r.Get("/rules", h.ListRules)
r.Post("/rules", h.CreateRule) r.Post("/rules", h.CreateRule)
r.Put("/rules/{ruleID}", h.UpdateRule) r.Put("/rules/{ruleID}", h.UpdateRule)

View File

@ -0,0 +1,97 @@
package mail
import (
"errors"
"net/http"
"github.com/go-chi/chi/v5"
"github.com/ultisuite/ulti-backend/internal/api/apiresponse"
"github.com/ultisuite/ulti-backend/internal/api/apivalidate"
"github.com/ultisuite/ulti-backend/internal/api/middleware"
)
func (h *Handler) SendOutboxNow(w http.ResponseWriter, r *http.Request) {
claims := middleware.ClaimsFromContext(r.Context())
userID, err := h.svc.ResolveUserID(r.Context(), claims.Sub)
if err != nil {
h.writeUserResolveError(w, r, err)
return
}
outboxID := chi.URLParam(r, "outboxID")
status, err := h.svc.SendOutboxNow(r.Context(), userID, outboxID)
if err != nil {
if h.writeOutboxActionError(w, r, err) {
return
}
h.logger.Error("send outbox now", "error", err, "outbox_id", outboxID)
apivalidate.WriteInternal(w, r)
return
}
apiresponse.WriteJSON(w, http.StatusAccepted, map[string]string{"id": outboxID, "status": status})
}
func (h *Handler) RescheduleOutbox(w http.ResponseWriter, r *http.Request) {
claims := middleware.ClaimsFromContext(r.Context())
userID, err := h.svc.ResolveUserID(r.Context(), claims.Sub)
if err != nil {
h.writeUserResolveError(w, r, err)
return
}
var req rescheduleOutboxRequest
if err := apivalidate.DecodeJSON(w, r, maxOutboxScheduleBody, &req); err != nil {
return
}
scheduledAt, verr := validateRescheduleOutbox(&req)
if verr != nil {
apivalidate.WriteValidationError(w, r, verr)
return
}
outboxID := chi.URLParam(r, "outboxID")
status, err := h.svc.RescheduleOutbox(r.Context(), userID, outboxID, *scheduledAt)
if err != nil {
if h.writeOutboxActionError(w, r, err) {
return
}
h.logger.Error("reschedule outbox", "error", err, "outbox_id", outboxID)
apivalidate.WriteInternal(w, r)
return
}
apiresponse.WriteJSON(w, http.StatusOK, map[string]string{"id": outboxID, "status": status})
}
func (h *Handler) CancelScheduledOutbox(w http.ResponseWriter, r *http.Request) {
claims := middleware.ClaimsFromContext(r.Context())
userID, err := h.svc.ResolveUserID(r.Context(), claims.Sub)
if err != nil {
h.writeUserResolveError(w, r, err)
return
}
outboxID := chi.URLParam(r, "outboxID")
status, err := h.svc.CancelScheduledOutbox(r.Context(), userID, outboxID)
if err != nil {
if h.writeOutboxActionError(w, r, err) {
return
}
h.logger.Error("cancel scheduled outbox", "error", err, "outbox_id", outboxID)
apivalidate.WriteInternal(w, r)
return
}
apiresponse.WriteJSON(w, http.StatusOK, map[string]string{"id": outboxID, "status": status})
}
func (h *Handler) writeOutboxActionError(w http.ResponseWriter, r *http.Request, err error) bool {
if errors.Is(err, ErrNotFound) {
apivalidate.WriteNotFound(w, r, "not found")
return true
}
if errors.Is(err, ErrInvalidOutboxStatus) {
apiresponse.WriteError(w, r, http.StatusConflict, apiresponse.CodeInvalidRequest, "invalid outbox status", nil)
return true
}
return false
}

View File

@ -8,6 +8,7 @@ import (
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"testing" "testing"
"time"
"github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5"
@ -129,6 +130,78 @@ func (f *fakeMailService) SendMessage(_ context.Context, userID string, req *sen
return "outbox-1", "queued", nil return "outbox-1", "queued", nil
} }
func (f *fakeMailService) SendOutboxNow(context.Context, string, string) (string, error) {
return "", ErrNotFound
}
func (f *fakeMailService) RescheduleOutbox(context.Context, string, string, time.Time) (string, error) {
return "", ErrNotFound
}
func (f *fakeMailService) CancelScheduledOutbox(context.Context, string, string) (string, error) {
return "", ErrNotFound
}
type outboxFakeService struct {
fakeMailService
items map[string]string
}
func newOutboxFakeService() *outboxFakeService {
return &outboxFakeService{
fakeMailService: *newFakeMailService(),
items: map[string]string{
"outbox-scheduled": "scheduled",
"outbox-queued": "queued",
},
}
}
func (f *outboxFakeService) SendOutboxNow(_ context.Context, userID, outboxID string) (string, error) {
if userID != testUserID {
return "", ErrNotFound
}
status, ok := f.items[outboxID]
if !ok {
return "", ErrNotFound
}
if status != "scheduled" {
return "", ErrInvalidOutboxStatus
}
f.items[outboxID] = "queued"
return "queued", nil
}
func (f *outboxFakeService) RescheduleOutbox(_ context.Context, userID, outboxID string, _ time.Time) (string, error) {
if userID != testUserID {
return "", ErrNotFound
}
status, ok := f.items[outboxID]
if !ok {
return "", ErrNotFound
}
if status != "scheduled" && status != "queued" {
return "", ErrInvalidOutboxStatus
}
f.items[outboxID] = "scheduled"
return "scheduled", nil
}
func (f *outboxFakeService) CancelScheduledOutbox(_ context.Context, userID, outboxID string) (string, error) {
if userID != testUserID {
return "", ErrNotFound
}
status, ok := f.items[outboxID]
if !ok {
return "", ErrNotFound
}
if status != "scheduled" {
return "", ErrInvalidOutboxStatus
}
f.items[outboxID] = "cancelled"
return "cancelled", nil
}
func (f *fakeMailService) ListDrafts(context.Context, string, query.ListParams) (DraftsList, error) { func (f *fakeMailService) ListDrafts(context.Context, string, query.ListParams) (DraftsList, error) {
return DraftsList{}, nil return DraftsList{}, nil
} }
@ -568,3 +641,158 @@ func TestDeleteMessage(t *testing.T) {
t.Fatalf("get after delete status = %d, want %d", rec.Code, http.StatusNotFound) t.Fatalf("get after delete status = %d, want %d", rec.Code, http.StatusNotFound)
} }
} }
func TestSendOutboxNow(t *testing.T) {
svc := newOutboxFakeService()
router := newTestMailRouter(svc)
t.Run("happy path", func(t *testing.T) {
req := httptest.NewRequest(http.MethodPost, "/outbox/outbox-scheduled/send-now", nil)
rec := httptest.NewRecorder()
router.ServeHTTP(rec, req)
if rec.Code != http.StatusAccepted {
t.Fatalf("status = %d, want %d; body = %s", rec.Code, http.StatusAccepted, rec.Body.String())
}
var resp map[string]string
if err := json.NewDecoder(rec.Body).Decode(&resp); err != nil {
t.Fatalf("decode body: %v", err)
}
if resp["id"] != "outbox-scheduled" || resp["status"] != "queued" {
t.Fatalf("response = %#v", resp)
}
if svc.items["outbox-scheduled"] != "queued" {
t.Fatalf("item status = %q, want queued", svc.items["outbox-scheduled"])
}
})
t.Run("invalid status", func(t *testing.T) {
req := httptest.NewRequest(http.MethodPost, "/outbox/outbox-queued/send-now", nil)
rec := httptest.NewRecorder()
router.ServeHTTP(rec, req)
if rec.Code != http.StatusConflict {
t.Fatalf("status = %d, want %d; body = %s", rec.Code, http.StatusConflict, rec.Body.String())
}
})
t.Run("not found", func(t *testing.T) {
req := httptest.NewRequest(http.MethodPost, "/outbox/missing/send-now", nil)
rec := httptest.NewRecorder()
router.ServeHTTP(rec, req)
if rec.Code != http.StatusNotFound {
t.Fatalf("status = %d, want %d; body = %s", rec.Code, http.StatusNotFound, rec.Body.String())
}
})
}
func TestRescheduleOutbox(t *testing.T) {
svc := newOutboxFakeService()
router := newTestMailRouter(svc)
future := time.Now().Add(2 * time.Hour).UTC().Format(time.RFC3339)
t.Run("happy path", func(t *testing.T) {
body, err := json.Marshal(map[string]string{"schedule_at": future})
if err != nil {
t.Fatalf("marshal payload: %v", err)
}
req := httptest.NewRequest(http.MethodPost, "/outbox/outbox-scheduled/reschedule", bytes.NewReader(body))
req.Header.Set("Content-Type", "application/json")
rec := httptest.NewRecorder()
router.ServeHTTP(rec, req)
if rec.Code != http.StatusOK {
t.Fatalf("status = %d, want %d; body = %s", rec.Code, http.StatusOK, rec.Body.String())
}
var resp map[string]string
if err := json.NewDecoder(rec.Body).Decode(&resp); err != nil {
t.Fatalf("decode body: %v", err)
}
if resp["status"] != "scheduled" {
t.Fatalf("response = %#v", resp)
}
})
t.Run("invalid schedule_at", func(t *testing.T) {
body, err := json.Marshal(map[string]string{"schedule_at": "not-a-date"})
if err != nil {
t.Fatalf("marshal payload: %v", err)
}
req := httptest.NewRequest(http.MethodPost, "/outbox/outbox-scheduled/reschedule", bytes.NewReader(body))
req.Header.Set("Content-Type", "application/json")
rec := httptest.NewRecorder()
router.ServeHTTP(rec, req)
if rec.Code != http.StatusBadRequest {
t.Fatalf("status = %d, want %d; body = %s", rec.Code, http.StatusBadRequest, rec.Body.String())
}
})
t.Run("invalid status", func(t *testing.T) {
svc.items["outbox-sent"] = "sent"
body, err := json.Marshal(map[string]string{"schedule_at": future})
if err != nil {
t.Fatalf("marshal payload: %v", err)
}
req := httptest.NewRequest(http.MethodPost, "/outbox/outbox-sent/reschedule", bytes.NewReader(body))
req.Header.Set("Content-Type", "application/json")
rec := httptest.NewRecorder()
router.ServeHTTP(rec, req)
if rec.Code != http.StatusConflict {
t.Fatalf("status = %d, want %d; body = %s", rec.Code, http.StatusConflict, rec.Body.String())
}
})
}
func TestCancelScheduledOutbox(t *testing.T) {
svc := newOutboxFakeService()
router := newTestMailRouter(svc)
t.Run("happy path", func(t *testing.T) {
svc.items["outbox-cancel-me"] = "scheduled"
req := httptest.NewRequest(http.MethodPost, "/outbox/outbox-cancel-me/cancel", nil)
rec := httptest.NewRecorder()
router.ServeHTTP(rec, req)
if rec.Code != http.StatusOK {
t.Fatalf("status = %d, want %d; body = %s", rec.Code, http.StatusOK, rec.Body.String())
}
var resp map[string]string
if err := json.NewDecoder(rec.Body).Decode(&resp); err != nil {
t.Fatalf("decode body: %v", err)
}
if resp["status"] != "cancelled" {
t.Fatalf("response = %#v", resp)
}
})
t.Run("invalid status", func(t *testing.T) {
req := httptest.NewRequest(http.MethodPost, "/outbox/outbox-queued/cancel", nil)
rec := httptest.NewRecorder()
router.ServeHTTP(rec, req)
if rec.Code != http.StatusConflict {
t.Fatalf("status = %d, want %d; body = %s", rec.Code, http.StatusConflict, rec.Body.String())
}
})
t.Run("not found", func(t *testing.T) {
req := httptest.NewRequest(http.MethodPost, "/outbox/missing/cancel", nil)
rec := httptest.NewRecorder()
router.ServeHTTP(rec, req)
if rec.Code != http.StatusNotFound {
t.Fatalf("status = %d, want %d; body = %s", rec.Code, http.StatusNotFound, rec.Body.String())
}
})
}

View File

@ -0,0 +1,73 @@
package mail
import (
"context"
"errors"
"time"
"github.com/jackc/pgx/v5"
)
var ErrInvalidOutboxStatus = errors.New("invalid outbox status")
func (s *Service) SendOutboxNow(ctx context.Context, userID, outboxID string) (string, error) {
var status string
err := s.db.QueryRow(ctx, `
UPDATE outbox SET status = 'queued', scheduled_at = NULL, updated_at = NOW()
WHERE id = $1 AND user_id = $2 AND status = 'scheduled'
RETURNING status
`, outboxID, userID).Scan(&status)
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return "", s.outboxActionError(ctx, userID, outboxID)
}
return "", err
}
return status, nil
}
func (s *Service) RescheduleOutbox(ctx context.Context, userID, outboxID string, scheduledAt time.Time) (string, error) {
var status string
err := s.db.QueryRow(ctx, `
UPDATE outbox SET status = 'scheduled', scheduled_at = $3, updated_at = NOW()
WHERE id = $1 AND user_id = $2 AND status IN ('scheduled', 'queued')
RETURNING status
`, outboxID, userID, scheduledAt).Scan(&status)
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return "", s.outboxActionError(ctx, userID, outboxID)
}
return "", err
}
return status, nil
}
func (s *Service) CancelScheduledOutbox(ctx context.Context, userID, outboxID string) (string, error) {
var status string
err := s.db.QueryRow(ctx, `
UPDATE outbox SET status = 'cancelled', scheduled_at = NULL, updated_at = NOW()
WHERE id = $1 AND user_id = $2 AND status = 'scheduled'
RETURNING status
`, outboxID, userID).Scan(&status)
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return "", s.outboxActionError(ctx, userID, outboxID)
}
return "", err
}
return status, nil
}
func (s *Service) outboxActionError(ctx context.Context, userID, outboxID string) error {
var status string
err := s.db.QueryRow(ctx, `
SELECT status FROM outbox WHERE id = $1 AND user_id = $2
`, outboxID, userID).Scan(&status)
if errors.Is(err, pgx.ErrNoRows) {
return ErrNotFound
}
if err != nil {
return err
}
return ErrInvalidOutboxStatus
}

View File

@ -3,6 +3,7 @@ package mail
import ( import (
"context" "context"
"io" "io"
"time"
"github.com/ultisuite/ulti-backend/internal/api/query" "github.com/ultisuite/ulti-backend/internal/api/query"
) )
@ -21,6 +22,9 @@ type ServiceAPI interface {
DeleteMessage(ctx context.Context, externalID, messageID string) error DeleteMessage(ctx context.Context, externalID, messageID string) error
GetThread(ctx context.Context, externalID, threadID string) (map[string]any, error) GetThread(ctx context.Context, externalID, threadID string) (map[string]any, error)
SendMessage(ctx context.Context, userID string, req *sendMessageRequest) (id, status string, err error) SendMessage(ctx context.Context, userID string, req *sendMessageRequest) (id, status string, err error)
SendOutboxNow(ctx context.Context, userID, outboxID string) (status string, err error)
RescheduleOutbox(ctx context.Context, userID, outboxID string, scheduledAt time.Time) (status string, err error)
CancelScheduledOutbox(ctx context.Context, userID, outboxID string) (status string, err error)
ListDrafts(ctx context.Context, externalID string, params query.ListParams) (DraftsList, error) ListDrafts(ctx context.Context, externalID string, params query.ListParams) (DraftsList, error)
GetDraft(ctx context.Context, externalID, draftID string) (map[string]any, error) GetDraft(ctx context.Context, externalID, draftID string) (map[string]any, error)
CreateDraft(ctx context.Context, userID string, req *draftRequest) (string, error) CreateDraft(ctx context.Context, userID string, req *draftRequest) (string, error)

View File

@ -8,6 +8,7 @@ import (
"net/url" "net/url"
"strconv" "strconv"
"strings" "strings"
"time"
"unicode" "unicode"
"github.com/ultisuite/ulti-backend/internal/api/apivalidate" "github.com/ultisuite/ulti-backend/internal/api/apivalidate"
@ -19,14 +20,15 @@ const (
maxWebhookRequestBody = 128 << 10 // 128 KiB maxWebhookRequestBody = 128 << 10 // 128 KiB
maxRulesRequestBody = 256 << 10 // 256 KiB maxRulesRequestBody = 256 << 10 // 256 KiB
maxFlagsLabelsBody = 32 << 10 // 32 KiB maxFlagsLabelsBody = 32 << 10 // 32 KiB
maxOutboxScheduleBody = 4 << 10 // 4 KiB
maxWebhookBodyTemplate = 64 << 10 // 64 KiB maxWebhookBodyTemplate = 64 << 10 // 64 KiB
maxWebhookHeaders = 20 maxWebhookHeaders = 20
maxHeaderNameLen = 256 maxHeaderNameLen = 256
maxHeaderValueLen = 8192 maxHeaderValueLen = 8192
maxSubjectLen = 998 maxSubjectLen = 998
maxEmailLen = 320 maxEmailLen = 320
maxHostLen = 253 maxHostLen = 253
maxAccountName = 128 maxAccountName = 128
maxUsernameLen = 256 maxUsernameLen = 256
@ -193,6 +195,25 @@ type sendMessageRequest struct {
IdempotencyKey string `json:"-"` IdempotencyKey string `json:"-"`
} }
type rescheduleOutboxRequest struct {
ScheduleAt string `json:"schedule_at"`
}
func validateRescheduleOutbox(req *rescheduleOutboxRequest) (*time.Time, *apivalidate.ValidationError) {
scheduleAt := strings.TrimSpace(req.ScheduleAt)
if scheduleAt == "" {
return nil, apivalidate.NewValidationError(apivalidate.FieldDetail{Field: "schedule_at", Message: "required"})
}
parsed, err := time.Parse(time.RFC3339, scheduleAt)
if err != nil {
return nil, apivalidate.NewValidationError(apivalidate.FieldDetail{Field: "schedule_at", Message: "invalid"})
}
if !parsed.After(time.Now()) {
return nil, apivalidate.NewValidationError(apivalidate.FieldDetail{Field: "schedule_at", Message: "must be in the future"})
}
return &parsed, nil
}
func validateSendMessage(req *sendMessageRequest) *apivalidate.ValidationError { func validateSendMessage(req *sendMessageRequest) *apivalidate.ValidationError {
var details []apivalidate.FieldDetail var details []apivalidate.FieldDetail
if strings.TrimSpace(req.AccountID) == "" { if strings.TrimSpace(req.AccountID) == "" {

View File

@ -0,0 +1,92 @@
package smtp
import (
"context"
"encoding/json"
"fmt"
"io"
"github.com/ultisuite/ulti-backend/internal/mail/storage"
)
// SendAttachment is a MIME body part ready for embedding in an outbound message.
type SendAttachment struct {
Filename string
ContentType string
ContentID string
IsInline bool
Data []byte
}
// AttachmentLoader fetches attachment bytes referenced by outbox JSON.
type AttachmentLoader interface {
Load(ctx context.Context, s3Key string) ([]byte, error)
}
// StorageAttachmentLoader loads attachment bytes from object storage.
type StorageAttachmentLoader struct {
Client *storage.Client
}
func (l *StorageAttachmentLoader) Load(ctx context.Context, s3Key string) ([]byte, error) {
if l == nil || l.Client == nil {
return nil, fmt.Errorf("object storage not configured")
}
obj, err := l.Client.Get(ctx, s3Key)
if err != nil {
return nil, err
}
defer obj.Close()
return io.ReadAll(obj)
}
type outboxAttachmentRef struct {
ID string `json:"id"`
Filename string `json:"filename"`
ContentType string `json:"content_type"`
Size int64 `json:"size"`
S3Bucket string `json:"s3_bucket"`
S3Key string `json:"s3_key"`
ContentID string `json:"content_id,omitempty"`
IsInline bool `json:"is_inline"`
}
func parseOutboxAttachmentsJSON(data []byte) ([]outboxAttachmentRef, error) {
if len(data) == 0 || string(data) == "[]" {
return nil, nil
}
var refs []outboxAttachmentRef
if err := json.Unmarshal(data, &refs); err != nil {
return nil, fmt.Errorf("parse outbox attachments: %w", err)
}
return refs, nil
}
func resolveOutboxAttachments(ctx context.Context, loader AttachmentLoader, data []byte) ([]SendAttachment, error) {
refs, err := parseOutboxAttachmentsJSON(data)
if err != nil {
return nil, err
}
if len(refs) == 0 {
return nil, nil
}
if loader == nil {
return nil, fmt.Errorf("attachment loader not configured")
}
attachments := make([]SendAttachment, 0, len(refs))
for _, ref := range refs {
payload, err := loader.Load(ctx, ref.S3Key)
if err != nil {
return nil, fmt.Errorf("load attachment %q: %w", ref.Filename, err)
}
attachments = append(attachments, SendAttachment{
Filename: ref.Filename,
ContentType: ref.ContentType,
ContentID: ref.ContentID,
IsInline: ref.IsInline,
Data: payload,
})
}
return attachments, nil
}

View File

@ -0,0 +1,92 @@
package smtp
import (
"context"
"errors"
"reflect"
"testing"
)
func TestParseOutboxAttachmentsJSON(t *testing.T) {
data := []byte(`[{
"id":"att-1",
"filename":"doc.pdf",
"content_type":"application/pdf",
"size":12,
"s3_bucket":"mail-attachments",
"s3_key":"user/drafts/id/doc.pdf",
"content_id":"logo@ultimail",
"is_inline":true
}]`)
refs, err := parseOutboxAttachmentsJSON(data)
if err != nil {
t.Fatalf("parseOutboxAttachmentsJSON() error = %v", err)
}
if len(refs) != 1 {
t.Fatalf("len(refs) = %d, want 1", len(refs))
}
if refs[0].Filename != "doc.pdf" || !refs[0].IsInline || refs[0].ContentID != "logo@ultimail" {
t.Fatalf("unexpected ref: %+v", refs[0])
}
}
func TestParseOutboxAttachmentsJSON_empty(t *testing.T) {
for _, data := range [][]byte{nil, []byte{}, []byte("[]")} {
refs, err := parseOutboxAttachmentsJSON(data)
if err != nil {
t.Fatalf("parseOutboxAttachmentsJSON(%q) error = %v", data, err)
}
if refs != nil {
t.Fatalf("parseOutboxAttachmentsJSON(%q) = %v, want nil", data, refs)
}
}
}
type stubAttachmentLoader struct {
data map[string][]byte
err error
}
func (s stubAttachmentLoader) Load(_ context.Context, s3Key string) ([]byte, error) {
if s.err != nil {
return nil, s.err
}
payload, ok := s.data[s3Key]
if !ok {
return nil, errors.New("missing object")
}
return payload, nil
}
func TestResolveOutboxAttachments(t *testing.T) {
data := []byte(`[{
"filename":"note.txt",
"content_type":"text/plain",
"s3_key":"drafts/note.txt"
}]`)
loader := stubAttachmentLoader{data: map[string][]byte{
"drafts/note.txt": []byte("hello"),
}}
got, err := resolveOutboxAttachments(context.Background(), loader, data)
if err != nil {
t.Fatalf("resolveOutboxAttachments() error = %v", err)
}
want := []SendAttachment{{
Filename: "note.txt",
ContentType: "text/plain",
Data: []byte("hello"),
}}
if !reflect.DeepEqual(got, want) {
t.Fatalf("resolveOutboxAttachments() = %+v, want %+v", got, want)
}
}
func TestResolveOutboxAttachments_requiresLoader(t *testing.T) {
data := []byte(`[{"filename":"note.txt","s3_key":"drafts/note.txt"}]`)
_, err := resolveOutboxAttachments(context.Background(), nil, data)
if err == nil {
t.Fatal("expected error when loader is nil")
}
}

187
internal/mail/smtp/mime.go Normal file
View File

@ -0,0 +1,187 @@
package smtp
import (
"bytes"
"encoding/base64"
"fmt"
"io"
"mime"
"mime/multipart"
"net/textproto"
"strings"
"time"
)
func buildMessage(req *SendRequest) string {
if len(req.Attachments) == 0 {
return buildMessageWithoutAttachments(req)
}
return buildMessageWithAttachments(req)
}
func buildMessageWithoutAttachments(req *SendRequest) string {
var b strings.Builder
writeCommonHeaders(&b, req)
if req.BodyHTML != "" {
boundary := fmt.Sprintf("----=_Part_%d", time.Now().UnixNano())
b.WriteString(fmt.Sprintf("Content-Type: multipart/alternative; boundary=\"%s\"\r\n", boundary))
b.WriteString("\r\n")
b.WriteString(fmt.Sprintf("--%s\r\n", boundary))
b.WriteString("Content-Type: text/plain; charset=UTF-8\r\n\r\n")
b.WriteString(req.BodyText)
b.WriteString(fmt.Sprintf("\r\n--%s\r\n", boundary))
b.WriteString("Content-Type: text/html; charset=UTF-8\r\n\r\n")
b.WriteString(req.BodyHTML)
b.WriteString(fmt.Sprintf("\r\n--%s--\r\n", boundary))
} else {
b.WriteString("Content-Type: text/plain; charset=UTF-8\r\n\r\n")
b.WriteString(req.BodyText)
}
return b.String()
}
func buildMessageWithAttachments(req *SendRequest) string {
body := &bytes.Buffer{}
mixed := multipart.NewWriter(body)
_ = writeMessageBodyPart(mixed, req.BodyText, req.BodyHTML)
for _, att := range req.Attachments {
_ = writeAttachmentPart(mixed, att)
}
_ = mixed.Close()
var head strings.Builder
writeCommonHeaders(&head, req)
head.WriteString(fmt.Sprintf("Content-Type: multipart/mixed; boundary=\"%s\"\r\n\r\n", mixed.Boundary()))
head.Write(body.Bytes())
return head.String()
}
func writeCommonHeaders(b *strings.Builder, req *SendRequest) {
b.WriteString(fmt.Sprintf("From: %s\r\n", req.From))
b.WriteString(fmt.Sprintf("To: %s\r\n", strings.Join(req.To, ", ")))
if len(req.Cc) > 0 {
b.WriteString(fmt.Sprintf("Cc: %s\r\n", strings.Join(req.Cc, ", ")))
}
b.WriteString(fmt.Sprintf("Subject: %s\r\n", req.Subject))
b.WriteString(fmt.Sprintf("Date: %s\r\n", time.Now().Format(time.RFC1123Z)))
b.WriteString(fmt.Sprintf("Message-ID: %s\r\n", generateMessageID(req.From)))
b.WriteString("MIME-Version: 1.0\r\n")
if req.InReplyTo != "" {
b.WriteString(fmt.Sprintf("In-Reply-To: %s\r\n", req.InReplyTo))
}
if len(req.References) > 0 {
b.WriteString(fmt.Sprintf("References: %s\r\n", strings.Join(req.References, " ")))
}
}
func writeMessageBodyPart(w *multipart.Writer, text, html string) error {
if html != "" {
altBuf := &bytes.Buffer{}
alt := multipart.NewWriter(altBuf)
if err := writeTextPart(alt, "text/plain; charset=UTF-8", text); err != nil {
return err
}
if err := writeTextPart(alt, "text/html; charset=UTF-8", html); err != nil {
return err
}
if err := alt.Close(); err != nil {
return err
}
h := textproto.MIMEHeader{}
h.Set("Content-Type", fmt.Sprintf("multipart/alternative; boundary=\"%s\"", alt.Boundary()))
part, err := w.CreatePart(h)
if err != nil {
return err
}
_, err = part.Write(altBuf.Bytes())
return err
}
h := textproto.MIMEHeader{}
h.Set("Content-Type", "text/plain; charset=UTF-8")
part, err := w.CreatePart(h)
if err != nil {
return err
}
_, err = part.Write([]byte(text))
return err
}
func writeTextPart(w *multipart.Writer, contentType, body string) error {
h := textproto.MIMEHeader{}
h.Set("Content-Type", contentType)
part, err := w.CreatePart(h)
if err != nil {
return err
}
_, err = part.Write([]byte(body))
return err
}
func writeAttachmentPart(w *multipart.Writer, att SendAttachment) error {
contentType := att.ContentType
if contentType == "" {
contentType = "application/octet-stream"
}
h := textproto.MIMEHeader{}
if att.Filename != "" {
h.Set("Content-Type", mime.FormatMediaType(contentType, map[string]string{
"name": att.Filename,
}))
} else {
h.Set("Content-Type", contentType)
}
h.Set("Content-Transfer-Encoding", "base64")
disposition := "attachment"
if att.IsInline {
disposition = "inline"
}
if att.Filename != "" {
h.Set("Content-Disposition", mime.FormatMediaType(disposition, map[string]string{
"filename": att.Filename,
}))
} else {
h.Set("Content-Disposition", disposition)
}
if att.IsInline && att.ContentID != "" {
cid := att.ContentID
if !strings.HasPrefix(cid, "<") {
cid = "<" + cid + ">"
}
h.Set("Content-ID", cid)
}
part, err := w.CreatePart(h)
if err != nil {
return err
}
return writeBase64(part, att.Data)
}
func writeBase64(w io.Writer, data []byte) error {
encoded := base64.StdEncoding.EncodeToString(data)
for i := 0; i < len(encoded); i += 76 {
end := i + 76
if end > len(encoded) {
end = len(encoded)
}
if _, err := io.WriteString(w, encoded[i:end]); err != nil {
return err
}
if end < len(encoded) {
if _, err := io.WriteString(w, "\r\n"); err != nil {
return err
}
}
}
return nil
}

View File

@ -0,0 +1,267 @@
package smtp
import (
"bytes"
"encoding/base64"
"io"
"mime"
"mime/multipart"
"net/mail"
"strings"
"testing"
)
func TestBuildMessage_plainOnly(t *testing.T) {
msg := buildMessage(&SendRequest{
From: "alice@example.com",
To: []string{"bob@example.com"},
Subject: "Hello",
BodyText: "Plain body",
})
if strings.Contains(msg, "multipart/") {
t.Fatal("plain-only message must not be multipart")
}
if !strings.Contains(msg, "Content-Type: text/plain; charset=UTF-8") {
t.Fatal("missing text/plain content type")
}
if !strings.Contains(msg, "Plain body") {
t.Fatal("missing body text")
}
}
func TestBuildMessage_alternativeOnly(t *testing.T) {
msg := buildMessage(&SendRequest{
From: "alice@example.com",
To: []string{"bob@example.com"},
Subject: "Hello",
BodyText: "Plain body",
BodyHTML: "<p>HTML body</p>",
})
if !strings.Contains(msg, "Content-Type: multipart/alternative") {
t.Fatal("missing multipart/alternative top-level content type")
}
if strings.Contains(msg, "multipart/mixed") {
t.Fatal("alternative-only message must not be multipart/mixed")
}
if !strings.Contains(msg, "Content-Type: text/html; charset=UTF-8") {
t.Fatal("missing text/html part")
}
}
func TestBuildMessage_mixedWithPlainAttachment(t *testing.T) {
payload := []byte("file-bytes")
msg := buildMessage(&SendRequest{
From: "alice@example.com",
To: []string{"bob@example.com"},
Subject: "With file",
BodyText: "See attached",
Attachments: []SendAttachment{{
Filename: "doc.pdf",
ContentType: "application/pdf",
Data: payload,
}},
})
parsed, err := mail.ReadMessage(strings.NewReader(msg))
if err != nil {
t.Fatalf("ReadMessage: %v", err)
}
mediaType, params, err := mime.ParseMediaType(parsed.Header.Get("Content-Type"))
if err != nil {
t.Fatalf("ParseMediaType: %v", err)
}
if mediaType != "multipart/mixed" {
t.Fatalf("top-level content type = %q, want multipart/mixed", mediaType)
}
parts := readAllParts(t, parsed.Body, params["boundary"])
if len(parts) != 2 {
t.Fatalf("part count = %d, want 2", len(parts))
}
if got := parts[0].Header.Get("Content-Type"); got != "text/plain; charset=UTF-8" {
t.Fatalf("body part content type = %q", got)
}
if string(parts[0].Body) != "See attached" {
t.Fatalf("body part = %q", parts[0].Body)
}
attType, attParams, err := mime.ParseMediaType(parts[1].Header.Get("Content-Type"))
if err != nil {
t.Fatalf("ParseMediaType attachment: %v", err)
}
if attType != "application/pdf" {
t.Fatalf("attachment content type = %q", attType)
}
if attParams["name"] != "doc.pdf" {
t.Fatalf("attachment name = %q", attParams["name"])
}
if parts[1].Header.Get("Content-Transfer-Encoding") != "base64" {
t.Fatal("attachment missing base64 transfer encoding")
}
disp, dispParams, err := mime.ParseMediaType(parts[1].Header.Get("Content-Disposition"))
if err != nil {
t.Fatalf("ParseMediaType disposition: %v", err)
}
if disp != "attachment" {
t.Fatalf("disposition = %q, want attachment", disp)
}
if dispParams["filename"] != "doc.pdf" {
t.Fatalf("filename = %q", dispParams["filename"])
}
if !bytes.Equal(decodeBase64Part(t, parts[1].Body), payload) {
t.Fatal("attachment payload mismatch")
}
}
func TestBuildMessage_mixedWithAlternativeAndAttachment(t *testing.T) {
msg := buildMessage(&SendRequest{
From: "alice@example.com",
To: []string{"bob@example.com"},
Subject: "Rich mail",
BodyText: "Plain",
BodyHTML: "<b>HTML</b>",
Attachments: []SendAttachment{{
Filename: "note.txt",
ContentType: "text/plain",
Data: []byte("attachment"),
}},
})
parsed, err := mail.ReadMessage(strings.NewReader(msg))
if err != nil {
t.Fatalf("ReadMessage: %v", err)
}
mediaType, params, err := mime.ParseMediaType(parsed.Header.Get("Content-Type"))
if err != nil {
t.Fatalf("ParseMediaType: %v", err)
}
if mediaType != "multipart/mixed" {
t.Fatalf("top-level content type = %q, want multipart/mixed", mediaType)
}
parts := readAllParts(t, parsed.Body, params["boundary"])
if len(parts) != 2 {
t.Fatalf("part count = %d, want 2", len(parts))
}
altType, altParams, err := mime.ParseMediaType(parts[0].Header.Get("Content-Type"))
if err != nil {
t.Fatalf("ParseMediaType alternative: %v", err)
}
if altType != "multipart/alternative" {
t.Fatalf("first part type = %q, want multipart/alternative", altType)
}
altParts := readAllParts(t, bytes.NewReader(parts[0].Body), altParams["boundary"])
if len(altParts) != 2 {
t.Fatalf("alternative part count = %d, want 2", len(altParts))
}
if string(altParts[0].Body) != "Plain" {
t.Fatalf("plain part = %q", altParts[0].Body)
}
if string(altParts[1].Body) != "<b>HTML</b>" {
t.Fatalf("html part = %q", altParts[1].Body)
}
disp, _, err := mime.ParseMediaType(parts[1].Header.Get("Content-Disposition"))
if err != nil {
t.Fatalf("ParseMediaType disposition: %v", err)
}
if disp != "attachment" {
t.Fatalf("attachment disposition = %q", disp)
}
}
func TestBuildMessage_inlineAttachmentHeaders(t *testing.T) {
msg := buildMessage(&SendRequest{
From: "alice@example.com",
To: []string{"bob@example.com"},
Subject: "Inline",
BodyHTML: `<img src="cid:logo@ultimail">`,
BodyText: "Logo",
Attachments: []SendAttachment{{
Filename: "logo.png",
ContentType: "image/png",
ContentID: "logo@ultimail",
IsInline: true,
Data: []byte{0x89, 0x50, 0x4e, 0x47},
}},
})
parsed, err := mail.ReadMessage(strings.NewReader(msg))
if err != nil {
t.Fatalf("ReadMessage: %v", err)
}
_, params, err := mime.ParseMediaType(parsed.Header.Get("Content-Type"))
if err != nil {
t.Fatalf("ParseMediaType: %v", err)
}
parts := readAllParts(t, parsed.Body, params["boundary"])
if len(parts) != 2 {
t.Fatalf("part count = %d, want 2", len(parts))
}
inline := parts[1]
disp, _, err := mime.ParseMediaType(inline.Header.Get("Content-Disposition"))
if err != nil {
t.Fatalf("ParseMediaType disposition: %v", err)
}
if disp != "inline" {
t.Fatalf("disposition = %q, want inline", disp)
}
if inline.Header.Get("Content-ID") != "<logo@ultimail>" {
t.Fatalf("content-id = %q", inline.Header.Get("Content-ID"))
}
}
type mimePart struct {
Header mail.Header
Body []byte
}
func readAllParts(t *testing.T, r io.Reader, boundary string) []mimePart {
t.Helper()
mr := multipart.NewReader(r, boundary)
var parts []mimePart
for {
part, err := mr.NextPart()
if err == io.EOF {
break
}
if err != nil {
t.Fatalf("NextPart: %v", err)
}
body, err := io.ReadAll(part)
if err != nil {
t.Fatalf("ReadAll part: %v", err)
}
parts = append(parts, mimePart{
Header: mail.Header(part.Header),
Body: body,
})
}
return parts
}
func decodeBase64Part(t *testing.T, raw []byte) []byte {
t.Helper()
clean := strings.Map(func(r rune) rune {
if r == '\r' || r == '\n' {
return -1
}
return r
}, string(raw))
out, err := base64.StdEncoding.DecodeString(clean)
if err != nil {
t.Fatalf("DecodeString: %v", err)
}
return out
}

View File

@ -3,9 +3,13 @@ package smtp
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"errors"
"fmt"
"log/slog" "log/slog"
"strings"
"time" "time"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool" "github.com/jackc/pgx/v5/pgxpool"
"github.com/ultisuite/ulti-backend/internal/observability" "github.com/ultisuite/ulti-backend/internal/observability"
) )
@ -15,24 +19,37 @@ type OutboxSender interface {
} }
type OutboxProcessor struct { type OutboxProcessor struct {
db *pgxpool.Pool db *pgxpool.Pool
sender OutboxSender sender OutboxSender
logger *slog.Logger attachmentLoader AttachmentLoader
interval time.Duration logger *slog.Logger
maxRetries int interval time.Duration
maxRetries int
} }
func NewOutboxProcessor(db *pgxpool.Pool, sender OutboxSender, interval time.Duration, maxRetries int) *OutboxProcessor { type OutboxOption func(*OutboxProcessor)
func WithAttachmentLoader(loader AttachmentLoader) OutboxOption {
return func(p *OutboxProcessor) {
p.attachmentLoader = loader
}
}
func NewOutboxProcessor(db *pgxpool.Pool, sender OutboxSender, interval time.Duration, maxRetries int, opts ...OutboxOption) *OutboxProcessor {
if maxRetries < 1 { if maxRetries < 1 {
maxRetries = DefaultMaxOutboxRetries maxRetries = DefaultMaxOutboxRetries
} }
return &OutboxProcessor{ p := &OutboxProcessor{
db: db, db: db,
sender: sender, sender: sender,
logger: slog.Default().With("component", "outbox"), logger: slog.Default().With("component", "outbox"),
interval: interval, interval: interval,
maxRetries: maxRetries, maxRetries: maxRetries,
} }
for _, opt := range opts {
opt(p)
}
return p
} }
func (p *OutboxProcessor) Start(ctx context.Context) { func (p *OutboxProcessor) Start(ctx context.Context) {
@ -65,7 +82,7 @@ func (p *OutboxProcessor) processQueue(ctx context.Context) {
LIMIT 10 LIMIT 10
FOR UPDATE SKIP LOCKED FOR UPDATE SKIP LOCKED
) )
RETURNING id, account_id, to_addrs, cc_addrs, bcc_addrs, subject, body_text, body_html, in_reply_to, references_header, retry_count RETURNING id, account_id, to_addrs, cc_addrs, bcc_addrs, subject, body_text, body_html, in_reply_to, references_header, attachments, retry_count
`) `)
if err != nil { if err != nil {
p.logger.Error("failed to query outbox", "error", err) p.logger.Error("failed to query outbox", "error", err)
@ -75,20 +92,21 @@ func (p *OutboxProcessor) processQueue(ctx context.Context) {
for rows.Next() { for rows.Next() {
var ( var (
id string id string
accountID string accountID string
toJSON []byte toJSON []byte
ccJSON []byte ccJSON []byte
bccJSON []byte bccJSON []byte
subject string subject string
bodyText string bodyText string
bodyHTML string bodyHTML string
inReplyTo string inReplyTo string
references []string references []string
retryCount int attachmentsJSON []byte
retryCount int
) )
if err := rows.Scan(&id, &accountID, &toJSON, &ccJSON, &bccJSON, &subject, &bodyText, &bodyHTML, &inReplyTo, &references, &retryCount); err != nil { if err := rows.Scan(&id, &accountID, &toJSON, &ccJSON, &bccJSON, &subject, &bodyText, &bodyHTML, &inReplyTo, &references, &attachmentsJSON, &retryCount); err != nil {
p.logger.Error("scan outbox row", "error", err) p.logger.Error("scan outbox row", "error", err)
continue continue
} }
@ -107,50 +125,41 @@ func (p *OutboxProcessor) processQueue(ctx context.Context) {
`, accountID).Scan(&fromEmail); err != nil || fromEmail == "" { `, accountID).Scan(&fromEmail); err != nil || fromEmail == "" {
if err := p.db.QueryRow(ctx, `SELECT email FROM mail_accounts WHERE id = $1`, accountID).Scan(&fromEmail); err != nil { if err := p.db.QueryRow(ctx, `SELECT email FROM mail_accounts WHERE id = $1`, accountID).Scan(&fromEmail); err != nil {
p.logger.Error("resolve from address", "outbox_id", id, "account_id", accountID, "error", err) p.logger.Error("resolve from address", "outbox_id", id, "account_id", accountID, "error", err)
p.markSendFailure(ctx, id, retryCount, fmt.Errorf("resolve from address: %w", err))
continue continue
} }
} }
attachments, err := resolveOutboxAttachments(ctx, p.attachmentLoader, attachmentsJSON)
if err != nil {
p.logger.Error("resolve outbox attachments", "outbox_id", id, "error", err)
p.markSendFailure(ctx, id, retryCount, err)
continue
}
req := &SendRequest{ req := &SendRequest{
AccountID: accountID, AccountID: accountID,
From: fromEmail, From: fromEmail,
To: to, To: to,
Cc: cc, Cc: cc,
Bcc: bcc, Bcc: bcc,
Subject: subject, Subject: subject,
BodyText: bodyText, BodyText: bodyText,
BodyHTML: bodyHTML, BodyHTML: bodyHTML,
InReplyTo: inReplyTo, InReplyTo: inReplyTo,
References: references, References: references,
Attachments: attachments,
} }
if err := p.sender.Send(ctx, req); err != nil { if err := p.sender.Send(ctx, req); err != nil {
p.logger.Error("send failed", "outbox_id", id, "error", err) p.logger.Error("send failed", "outbox_id", id, "error", err)
observability.IncOutboxProcessed("error") observability.IncOutboxProcessed("error")
nextRetry := time.Now().Add(OutboxRetryDelay(retryCount)) p.markSendFailure(ctx, id, retryCount, err)
newRetry := retryCount + 1
status := "queued"
if newRetry >= p.maxRetries {
status = "failed"
}
if _, execErr := p.db.Exec(ctx, `
UPDATE outbox SET
status = $2,
retry_count = $3,
next_retry_at = $4,
error = $5,
updated_at = NOW()
WHERE id = $1
`, id, status, newRetry, nextRetry, err.Error()); execErr != nil {
p.logger.Error("failed to mark outbox retry", "outbox_id", id, "error", execErr)
}
} else { } else {
observability.IncOutboxProcessed("success") observability.IncOutboxProcessed("success")
if _, execErr := p.db.Exec(ctx, ` if err := p.persistSentCopyAndMarkSent(ctx, id, accountID, req, attachmentsJSON); err != nil {
UPDATE outbox SET status = 'sent', sent_at = NOW(), updated_at = NOW() p.logger.Error("persist sent copy failed", "outbox_id", id, "error", err)
WHERE id = $1 p.markSentOnly(ctx, id)
`, id); execErr != nil {
p.logger.Error("failed to mark outbox sent", "outbox_id", id, "error", execErr)
} }
} }
} }
@ -186,6 +195,183 @@ func (p *OutboxProcessor) updateQueueDepth(ctx context.Context) {
observability.SetOutboxQueueDepth(count) observability.SetOutboxQueueDepth(count)
} }
func (p *OutboxProcessor) markSendFailure(ctx context.Context, outboxID string, retryCount int, sendErr error) {
newRetry := retryCount + 1
status := "queued"
var nextRetry any = time.Now().Add(OutboxRetryDelay(retryCount))
if newRetry >= p.maxRetries {
status = "failed"
nextRetry = nil
if err := p.recordDeadLetter(ctx, outboxID, newRetry, sendErr); err != nil {
p.logger.Error("failed to write dead-letter entry", "outbox_id", outboxID, "error", err)
}
}
if _, err := p.db.Exec(ctx, `
UPDATE outbox SET
status = $2,
retry_count = $3,
next_retry_at = $4,
error = $5,
updated_at = NOW()
WHERE id = $1
`, outboxID, status, newRetry, nextRetry, sendErr.Error()); err != nil {
p.logger.Error("failed to mark outbox retry", "outbox_id", outboxID, "error", err)
}
}
func (p *OutboxProcessor) recordDeadLetter(ctx context.Context, outboxID string, attempt int, sendErr error) error {
_, err := p.db.Exec(ctx, `
INSERT INTO outbox_dead_letters (outbox_id, attempt_count, error)
VALUES ($1, $2, $3)
`, outboxID, attempt, sendErr.Error())
return err
}
func (p *OutboxProcessor) markSentOnly(ctx context.Context, outboxID string) {
if _, err := p.db.Exec(ctx, `
UPDATE outbox SET status = 'sent', sent_at = NOW(), error = '', updated_at = NOW()
WHERE id = $1
`, outboxID); err != nil {
p.logger.Error("failed to mark outbox sent", "outbox_id", outboxID, "error", err)
}
}
func (p *OutboxProcessor) persistSentCopyAndMarkSent(
ctx context.Context,
outboxID, accountID string,
req *SendRequest,
attachmentsJSON []byte,
) error {
tx, err := p.db.BeginTx(ctx, pgx.TxOptions{})
if err != nil {
return err
}
defer tx.Rollback(ctx)
folderID, err := ensureSentFolder(ctx, tx, accountID)
if err != nil {
return err
}
uid, err := nextSyntheticUID(ctx, tx, folderID)
if err != nil {
return err
}
now := time.Now()
fromJSON := marshalAddressesJSON([]string{req.From})
toJSON := marshalAddressesJSON(req.To)
ccJSON := marshalAddressesJSON(req.Cc)
bccJSON := marshalAddressesJSON(req.Bcc)
snippet := snippetFromBodies(req.BodyText, req.BodyHTML)
msgID := generateMessageID(req.From)
var messageRowID string
err = tx.QueryRow(ctx, `
INSERT INTO messages (
account_id, folder_id, uid, message_id, subject,
from_addr, to_addrs, cc_addrs, bcc_addrs,
date, snippet, body_text, body_html, has_attachments,
in_reply_to, references_header
)
VALUES ($1, $2, $3, $4, $5, $6::jsonb, $7::jsonb, $8::jsonb, $9::jsonb, $10, $11, $12, $13, $14, $15, $16)
RETURNING id
`, accountID, folderID, uid, msgID, req.Subject, fromJSON, toJSON, ccJSON, bccJSON,
now, snippet, req.BodyText, req.BodyHTML, len(req.Attachments) > 0, req.InReplyTo, req.References,
).Scan(&messageRowID)
if err != nil {
return err
}
refs, err := parseOutboxAttachmentsJSON(attachmentsJSON)
if err != nil {
return err
}
for _, ref := range refs {
if _, err := tx.Exec(ctx, `
INSERT INTO attachments (
message_id, filename, content_type, size, s3_bucket, s3_key, content_id, is_inline
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
`, messageRowID, ref.Filename, ref.ContentType, ref.Size, ref.S3Bucket, ref.S3Key, ref.ContentID, ref.IsInline); err != nil {
return err
}
}
if _, err := tx.Exec(ctx, `
UPDATE outbox SET status = 'sent', sent_at = NOW(), error = '', updated_at = NOW()
WHERE id = $1
`, outboxID); err != nil {
return err
}
return tx.Commit(ctx)
}
func ensureSentFolder(ctx context.Context, tx pgx.Tx, accountID string) (string, error) {
var folderID string
err := tx.QueryRow(ctx, `
SELECT id
FROM mail_folders
WHERE account_id = $1
AND (folder_type = 'sent' OR lower(name) = 'sent' OR lower(remote_name) = 'sent')
ORDER BY created_at ASC
LIMIT 1
`, accountID).Scan(&folderID)
if err == nil {
return folderID, nil
}
if !errors.Is(err, pgx.ErrNoRows) {
return "", err
}
err = tx.QueryRow(ctx, `
INSERT INTO mail_folders (account_id, name, remote_name, folder_type, created_at, updated_at)
VALUES ($1, 'Sent', 'Sent', 'sent', NOW(), NOW())
ON CONFLICT (account_id, remote_name) DO UPDATE SET updated_at = NOW()
RETURNING id
`, accountID).Scan(&folderID)
if err != nil {
return "", err
}
return folderID, nil
}
func nextSyntheticUID(ctx context.Context, tx pgx.Tx, folderID string) (int64, error) {
var uid int64
err := tx.QueryRow(ctx, `
SELECT COALESCE(MAX(uid), 0) + 1
FROM messages
WHERE folder_id = $1
`, folderID).Scan(&uid)
return uid, err
}
func snippetFromBodies(bodyText, bodyHTML string) string {
text := strings.TrimSpace(bodyText)
if text == "" {
text = strings.TrimSpace(bodyHTML)
}
if len(text) > 200 {
return text[:200]
}
return text
}
func marshalAddressesJSON(addrs []string) []byte {
items := make([]map[string]string, 0, len(addrs))
for _, addr := range addrs {
trimmed := strings.TrimSpace(addr)
if trimmed == "" {
continue
}
items = append(items, map[string]string{"address": trimmed})
}
payload, err := json.Marshal(items)
if err != nil {
return []byte("[]")
}
return payload
}
func parseJSONAddresses(data []byte) []string { func parseJSONAddresses(data []byte) []string {
var addrs []struct { var addrs []struct {
Address string `json:"address"` Address string `json:"address"`

View File

@ -31,16 +31,17 @@ func NewSender(db *pgxpool.Pool, credManager *credentials.Manager) *Sender {
} }
type SendRequest struct { type SendRequest struct {
AccountID string AccountID string
From string From string
To []string To []string
Cc []string Cc []string
Bcc []string Bcc []string
Subject string Subject string
BodyText string BodyText string
BodyHTML string BodyHTML string
InReplyTo string InReplyTo string
References []string References []string
Attachments []SendAttachment
} }
func (s *Sender) Send(ctx context.Context, req *SendRequest) error { func (s *Sender) Send(ctx context.Context, req *SendRequest) error {
@ -87,45 +88,6 @@ func (s *Sender) Send(ctx context.Context, req *SendRequest) error {
return nil return nil
} }
func buildMessage(req *SendRequest) string {
var b strings.Builder
b.WriteString(fmt.Sprintf("From: %s\r\n", req.From))
b.WriteString(fmt.Sprintf("To: %s\r\n", strings.Join(req.To, ", ")))
if len(req.Cc) > 0 {
b.WriteString(fmt.Sprintf("Cc: %s\r\n", strings.Join(req.Cc, ", ")))
}
b.WriteString(fmt.Sprintf("Subject: %s\r\n", req.Subject))
b.WriteString(fmt.Sprintf("Date: %s\r\n", time.Now().Format(time.RFC1123Z)))
b.WriteString(fmt.Sprintf("Message-ID: %s\r\n", generateMessageID(req.From)))
b.WriteString("MIME-Version: 1.0\r\n")
if req.InReplyTo != "" {
b.WriteString(fmt.Sprintf("In-Reply-To: %s\r\n", req.InReplyTo))
}
if len(req.References) > 0 {
b.WriteString(fmt.Sprintf("References: %s\r\n", strings.Join(req.References, " ")))
}
if req.BodyHTML != "" {
boundary := fmt.Sprintf("----=_Part_%d", time.Now().UnixNano())
b.WriteString(fmt.Sprintf("Content-Type: multipart/alternative; boundary=\"%s\"\r\n", boundary))
b.WriteString("\r\n")
b.WriteString(fmt.Sprintf("--%s\r\n", boundary))
b.WriteString("Content-Type: text/plain; charset=UTF-8\r\n\r\n")
b.WriteString(req.BodyText)
b.WriteString(fmt.Sprintf("\r\n--%s\r\n", boundary))
b.WriteString("Content-Type: text/html; charset=UTF-8\r\n\r\n")
b.WriteString(req.BodyHTML)
b.WriteString(fmt.Sprintf("\r\n--%s--\r\n", boundary))
} else {
b.WriteString("Content-Type: text/plain; charset=UTF-8\r\n\r\n")
b.WriteString(req.BodyText)
}
return b.String()
}
func generateMessageID(from string) string { func generateMessageID(from string) string {
domain := "ultimail.local" domain := "ultimail.local"
if i := strings.LastIndex(from, "@"); i >= 0 && i < len(from)-1 { if i := strings.LastIndex(from, "@"); i >= 0 && i < len(from)-1 {

View File

@ -0,0 +1,5 @@
DROP INDEX IF EXISTS idx_outbox_dead_letters_outbox;
DROP TABLE IF EXISTS outbox_dead_letters;
ALTER TABLE outbox
DROP CONSTRAINT IF EXISTS outbox_status_valid_chk;

View File

@ -0,0 +1,17 @@
ALTER TABLE outbox
DROP CONSTRAINT IF EXISTS outbox_status_valid_chk;
ALTER TABLE outbox
ADD CONSTRAINT outbox_status_valid_chk
CHECK (status IN ('draft', 'queued', 'scheduled', 'sending', 'sent', 'failed', 'cancelled')) NOT VALID;
CREATE TABLE IF NOT EXISTS outbox_dead_letters (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
outbox_id UUID NOT NULL REFERENCES outbox(id) ON DELETE CASCADE,
attempt_count INT NOT NULL DEFAULT 0,
error TEXT NOT NULL DEFAULT '',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_outbox_dead_letters_outbox
ON outbox_dead_letters(outbox_id, created_at DESC);

View File

@ -103,11 +103,11 @@ Objectif: transformer état actuel (partiellement implémenté) vers produit fon
### 2.3 SMTP / Outbox / Scheduling ### 2.3 SMTP / Outbox / Scheduling
- [ ] Normaliser statuts outbox (`draft`, `queued`, `scheduled`, `sending`, `sent`, `failed`, `cancelled`). - [x] Normaliser statuts outbox (`draft`, `queued`, `scheduled`, `sending`, `sent`, `failed`, `cancelled`).
- [ ] Implémenter retries exponentiels + dead-letter strategy. - [x] Implémenter retries exponentiels + dead-letter strategy.
- [ ] Implémenter "send now", "reschedule", "cancel scheduled". - [x] Implémenter "send now", "reschedule", "cancel scheduled".
- [ ] Écrire message envoyé dans dossier Sent (sync cohérente UI). - [x] Écrire message envoyé dans dossier Sent (sync cohérente UI).
- [ ] Gérer inline attachments MIME multipart mixed/alternative. - [x] Gérer inline attachments MIME multipart mixed/alternative.
### 2.4 Rules & Webhooks ### 2.4 Rules & Webhooks