package smtp import ( "context" "encoding/json" "errors" "fmt" "log/slog" "strings" "time" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" "github.com/ultisuite/ulti-backend/internal/observability" ) type OutboxSender interface { Send(ctx context.Context, req *SendRequest) error } type OutboxProcessor struct { db *pgxpool.Pool sender OutboxSender attachmentLoader AttachmentLoader logger *slog.Logger interval time.Duration maxRetries int } type OutboxOption func(*OutboxProcessor) func WithAttachmentLoader(loader AttachmentLoader) OutboxOption { return func(p *OutboxProcessor) { p.attachmentLoader = loader } } func NewOutboxProcessor(db *pgxpool.Pool, sender OutboxSender, interval time.Duration, maxRetries int, opts ...OutboxOption) *OutboxProcessor { if maxRetries < 1 { maxRetries = DefaultMaxOutboxRetries } p := &OutboxProcessor{ db: db, sender: sender, logger: slog.Default().With("component", "outbox"), interval: interval, maxRetries: maxRetries, } for _, opt := range opts { opt(p) } return p } func (p *OutboxProcessor) Start(ctx context.Context) { ticker := time.NewTicker(p.interval) defer ticker.Stop() p.logger.Info("outbox processor started", "interval", p.interval) for { select { case <-ctx.Done(): p.logger.Info("outbox processor stopped") return case <-ticker.C: p.processScheduled(ctx) p.processQueue(ctx) p.updateQueueDepth(ctx) } } } func (p *OutboxProcessor) processQueue(ctx context.Context) { rows, err := p.db.Query(ctx, ` UPDATE outbox SET status = 'sending', updated_at = NOW() WHERE id IN ( SELECT id FROM outbox WHERE status = 'queued' AND (next_retry_at IS NULL OR next_retry_at <= NOW()) ORDER BY created_at ASC LIMIT 10 FOR UPDATE SKIP LOCKED ) RETURNING id, account_id, to_addrs, cc_addrs, bcc_addrs, subject, body_text, body_html, in_reply_to, references_header, attachments, retry_count `) if err != nil { p.logger.Error("failed to query outbox", "error", err) return } defer rows.Close() for rows.Next() { var ( id string accountID string toJSON []byte ccJSON []byte bccJSON []byte subject string bodyText string bodyHTML string inReplyTo string references []string attachmentsJSON []byte retryCount int ) if err := rows.Scan(&id, &accountID, &toJSON, &ccJSON, &bccJSON, &subject, &bodyText, &bodyHTML, &inReplyTo, &references, &attachmentsJSON, &retryCount); err != nil { p.logger.Error("scan outbox row", "error", err) continue } to := parseJSONAddresses(toJSON) cc := parseJSONAddresses(ccJSON) bcc := parseJSONAddresses(bccJSON) // Get the from address var fromEmail string if err := p.db.QueryRow(ctx, ` SELECT mi.email FROM mail_identities mi JOIN mail_accounts ma ON mi.account_id = ma.id WHERE ma.id = $1 AND mi.is_default = true LIMIT 1 `, accountID).Scan(&fromEmail); err != nil || fromEmail == "" { if err := p.db.QueryRow(ctx, `SELECT email FROM mail_accounts WHERE id = $1`, accountID).Scan(&fromEmail); err != nil { p.logger.Error("resolve from address", "outbox_id", id, "account_id", accountID, "error", err) p.markSendFailure(ctx, id, retryCount, fmt.Errorf("resolve from address: %w", err)) continue } } attachments, err := resolveOutboxAttachments(ctx, p.attachmentLoader, attachmentsJSON) if err != nil { p.logger.Error("resolve outbox attachments", "outbox_id", id, "error", err) p.markSendFailure(ctx, id, retryCount, err) continue } req := &SendRequest{ AccountID: accountID, From: fromEmail, To: to, Cc: cc, Bcc: bcc, Subject: subject, BodyText: bodyText, BodyHTML: bodyHTML, InReplyTo: inReplyTo, References: references, Attachments: attachments, } if err := p.sender.Send(ctx, req); err != nil { p.logger.Error("send failed", "outbox_id", id, "error", err) observability.IncOutboxProcessed("error") p.markSendFailure(ctx, id, retryCount, err) } else { observability.IncOutboxProcessed("success") if err := p.persistSentCopyAndMarkSent(ctx, id, accountID, req, attachmentsJSON); err != nil { p.logger.Error("persist sent copy failed", "outbox_id", id, "error", err) p.markSentOnly(ctx, id) } } } if err := rows.Err(); err != nil { p.logger.Error("iterate outbox rows", "error", err) } } func (p *OutboxProcessor) processScheduled(ctx context.Context) { result, err := p.db.Exec(ctx, ` UPDATE outbox SET status = 'queued', updated_at = NOW() WHERE status = 'scheduled' AND scheduled_at IS NOT NULL AND scheduled_at <= NOW() `) if err != nil { p.logger.Error("failed to process scheduled", "error", err) return } if n := result.RowsAffected(); n > 0 { p.logger.Info("promoted scheduled outbox rows", "count", n) } } func (p *OutboxProcessor) updateQueueDepth(ctx context.Context) { var count int64 err := p.db.QueryRow(ctx, ` SELECT COUNT(*) FROM outbox WHERE status IN ('queued', 'scheduled', 'sending') `).Scan(&count) if err != nil { p.logger.Error("failed to compute outbox queue depth", "error", err) return } observability.SetOutboxQueueDepth(count) } func (p *OutboxProcessor) markSendFailure(ctx context.Context, outboxID string, retryCount int, sendErr error) { newRetry := retryCount + 1 status := "queued" var nextRetry any = time.Now().Add(OutboxRetryDelay(retryCount)) if newRetry >= p.maxRetries { status = "failed" nextRetry = nil if err := p.recordDeadLetter(ctx, outboxID, newRetry, sendErr); err != nil { p.logger.Error("failed to write dead-letter entry", "outbox_id", outboxID, "error", err) } } if _, err := p.db.Exec(ctx, ` UPDATE outbox SET status = $2, retry_count = $3, next_retry_at = $4, error = $5, updated_at = NOW() WHERE id = $1 `, outboxID, status, newRetry, nextRetry, sendErr.Error()); err != nil { p.logger.Error("failed to mark outbox retry", "outbox_id", outboxID, "error", err) } } func (p *OutboxProcessor) recordDeadLetter(ctx context.Context, outboxID string, attempt int, sendErr error) error { _, err := p.db.Exec(ctx, ` INSERT INTO outbox_dead_letters (outbox_id, attempt_count, error) VALUES ($1, $2, $3) `, outboxID, attempt, sendErr.Error()) return err } func (p *OutboxProcessor) markSentOnly(ctx context.Context, outboxID string) { if _, err := p.db.Exec(ctx, ` UPDATE outbox SET status = 'sent', sent_at = NOW(), error = '', updated_at = NOW() WHERE id = $1 `, outboxID); err != nil { p.logger.Error("failed to mark outbox sent", "outbox_id", outboxID, "error", err) } } func (p *OutboxProcessor) persistSentCopyAndMarkSent( ctx context.Context, outboxID, accountID string, req *SendRequest, attachmentsJSON []byte, ) error { tx, err := p.db.BeginTx(ctx, pgx.TxOptions{}) if err != nil { return err } defer tx.Rollback(ctx) folderID, err := ensureSentFolder(ctx, tx, accountID) if err != nil { return err } uid, err := nextSyntheticUID(ctx, tx, folderID) if err != nil { return err } now := time.Now() fromJSON := marshalAddressesJSON([]string{req.From}) toJSON := marshalAddressesJSON(req.To) ccJSON := marshalAddressesJSON(req.Cc) bccJSON := marshalAddressesJSON(req.Bcc) snippet := snippetFromBodies(req.BodyText, req.BodyHTML) msgID := generateMessageID(req.From) var messageRowID string err = tx.QueryRow(ctx, ` INSERT INTO messages ( account_id, folder_id, uid, message_id, subject, from_addr, to_addrs, cc_addrs, bcc_addrs, date, snippet, body_text, body_html, has_attachments, in_reply_to, references_header ) VALUES ($1, $2, $3, $4, $5, $6::jsonb, $7::jsonb, $8::jsonb, $9::jsonb, $10, $11, $12, $13, $14, $15, $16) RETURNING id `, accountID, folderID, uid, msgID, req.Subject, fromJSON, toJSON, ccJSON, bccJSON, now, snippet, req.BodyText, req.BodyHTML, len(req.Attachments) > 0, req.InReplyTo, req.References, ).Scan(&messageRowID) if err != nil { return err } refs, err := parseOutboxAttachmentsJSON(attachmentsJSON) if err != nil { return err } for _, ref := range refs { if _, err := tx.Exec(ctx, ` INSERT INTO attachments ( message_id, filename, content_type, size, s3_bucket, s3_key, content_id, is_inline ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) `, messageRowID, ref.Filename, ref.ContentType, ref.Size, ref.S3Bucket, ref.S3Key, ref.ContentID, ref.IsInline); err != nil { return err } } if _, err := tx.Exec(ctx, ` UPDATE outbox SET status = 'sent', sent_at = NOW(), error = '', updated_at = NOW() WHERE id = $1 `, outboxID); err != nil { return err } return tx.Commit(ctx) } func ensureSentFolder(ctx context.Context, tx pgx.Tx, accountID string) (string, error) { var folderID string err := tx.QueryRow(ctx, ` SELECT id FROM mail_folders WHERE account_id = $1 AND (folder_type = 'sent' OR lower(name) = 'sent' OR lower(remote_name) = 'sent') ORDER BY created_at ASC LIMIT 1 `, accountID).Scan(&folderID) if err == nil { return folderID, nil } if !errors.Is(err, pgx.ErrNoRows) { return "", err } err = tx.QueryRow(ctx, ` INSERT INTO mail_folders (account_id, name, remote_name, folder_type, created_at, updated_at) VALUES ($1, 'Sent', 'Sent', 'sent', NOW(), NOW()) ON CONFLICT (account_id, remote_name) DO UPDATE SET updated_at = NOW() RETURNING id `, accountID).Scan(&folderID) if err != nil { return "", err } return folderID, nil } func nextSyntheticUID(ctx context.Context, tx pgx.Tx, folderID string) (int64, error) { var uid int64 err := tx.QueryRow(ctx, ` SELECT COALESCE(MAX(uid), 0) + 1 FROM messages WHERE folder_id = $1 `, folderID).Scan(&uid) return uid, err } func snippetFromBodies(bodyText, bodyHTML string) string { text := strings.TrimSpace(bodyText) if text == "" { text = strings.TrimSpace(bodyHTML) } if len(text) > 200 { return text[:200] } return text } func marshalAddressesJSON(addrs []string) []byte { items := make([]map[string]string, 0, len(addrs)) for _, addr := range addrs { trimmed := strings.TrimSpace(addr) if trimmed == "" { continue } items = append(items, map[string]string{"address": trimmed}) } payload, err := json.Marshal(items) if err != nil { return []byte("[]") } return payload } func parseJSONAddresses(data []byte) []string { var addrs []struct { Address string `json:"address"` } if err := json.Unmarshal(data, &addrs); err != nil { // Try as plain string array var plain []string if err := json.Unmarshal(data, &plain); err == nil { return plain } return nil } result := make([]string, 0, len(addrs)) for _, a := range addrs { result = append(result, a.Address) } return result }