- Added new API endpoints for sending, rescheduling, and canceling scheduled outbox messages. - Implemented outbox processing logic to handle attachments and manage message statuses. - Introduced a dead-letter strategy for failed outbox messages, enhancing reliability. - Updated database schema to support new outbox statuses and dead-letter entries. - Enhanced unit tests for outbox functionalities, ensuring robust error handling and validation. - Improved attachment handling in the outbox processor to support inline and regular attachments.
393 lines
11 KiB
Go
393 lines
11 KiB
Go
package smtp
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"log/slog"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/jackc/pgx/v5"
|
|
"github.com/jackc/pgx/v5/pgxpool"
|
|
"github.com/ultisuite/ulti-backend/internal/observability"
|
|
)
|
|
|
|
type OutboxSender interface {
|
|
Send(ctx context.Context, req *SendRequest) error
|
|
}
|
|
|
|
type OutboxProcessor struct {
|
|
db *pgxpool.Pool
|
|
sender OutboxSender
|
|
attachmentLoader AttachmentLoader
|
|
logger *slog.Logger
|
|
interval time.Duration
|
|
maxRetries int
|
|
}
|
|
|
|
type OutboxOption func(*OutboxProcessor)
|
|
|
|
func WithAttachmentLoader(loader AttachmentLoader) OutboxOption {
|
|
return func(p *OutboxProcessor) {
|
|
p.attachmentLoader = loader
|
|
}
|
|
}
|
|
|
|
func NewOutboxProcessor(db *pgxpool.Pool, sender OutboxSender, interval time.Duration, maxRetries int, opts ...OutboxOption) *OutboxProcessor {
|
|
if maxRetries < 1 {
|
|
maxRetries = DefaultMaxOutboxRetries
|
|
}
|
|
p := &OutboxProcessor{
|
|
db: db,
|
|
sender: sender,
|
|
logger: slog.Default().With("component", "outbox"),
|
|
interval: interval,
|
|
maxRetries: maxRetries,
|
|
}
|
|
for _, opt := range opts {
|
|
opt(p)
|
|
}
|
|
return p
|
|
}
|
|
|
|
func (p *OutboxProcessor) Start(ctx context.Context) {
|
|
ticker := time.NewTicker(p.interval)
|
|
defer ticker.Stop()
|
|
|
|
p.logger.Info("outbox processor started", "interval", p.interval)
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
p.logger.Info("outbox processor stopped")
|
|
return
|
|
case <-ticker.C:
|
|
p.processScheduled(ctx)
|
|
p.processQueue(ctx)
|
|
p.updateQueueDepth(ctx)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (p *OutboxProcessor) processQueue(ctx context.Context) {
|
|
rows, err := p.db.Query(ctx, `
|
|
UPDATE outbox SET status = 'sending', updated_at = NOW()
|
|
WHERE id IN (
|
|
SELECT id FROM outbox
|
|
WHERE status = 'queued'
|
|
AND (next_retry_at IS NULL OR next_retry_at <= NOW())
|
|
ORDER BY created_at ASC
|
|
LIMIT 10
|
|
FOR UPDATE SKIP LOCKED
|
|
)
|
|
RETURNING id, account_id, to_addrs, cc_addrs, bcc_addrs, subject, body_text, body_html, in_reply_to, references_header, attachments, retry_count
|
|
`)
|
|
if err != nil {
|
|
p.logger.Error("failed to query outbox", "error", err)
|
|
return
|
|
}
|
|
defer rows.Close()
|
|
|
|
for rows.Next() {
|
|
var (
|
|
id string
|
|
accountID string
|
|
toJSON []byte
|
|
ccJSON []byte
|
|
bccJSON []byte
|
|
subject string
|
|
bodyText string
|
|
bodyHTML string
|
|
inReplyTo string
|
|
references []string
|
|
attachmentsJSON []byte
|
|
retryCount int
|
|
)
|
|
|
|
if err := rows.Scan(&id, &accountID, &toJSON, &ccJSON, &bccJSON, &subject, &bodyText, &bodyHTML, &inReplyTo, &references, &attachmentsJSON, &retryCount); err != nil {
|
|
p.logger.Error("scan outbox row", "error", err)
|
|
continue
|
|
}
|
|
|
|
to := parseJSONAddresses(toJSON)
|
|
cc := parseJSONAddresses(ccJSON)
|
|
bcc := parseJSONAddresses(bccJSON)
|
|
|
|
// Get the from address
|
|
var fromEmail string
|
|
if err := p.db.QueryRow(ctx, `
|
|
SELECT mi.email FROM mail_identities mi
|
|
JOIN mail_accounts ma ON mi.account_id = ma.id
|
|
WHERE ma.id = $1 AND mi.is_default = true
|
|
LIMIT 1
|
|
`, accountID).Scan(&fromEmail); err != nil || fromEmail == "" {
|
|
if err := p.db.QueryRow(ctx, `SELECT email FROM mail_accounts WHERE id = $1`, accountID).Scan(&fromEmail); err != nil {
|
|
p.logger.Error("resolve from address", "outbox_id", id, "account_id", accountID, "error", err)
|
|
p.markSendFailure(ctx, id, retryCount, fmt.Errorf("resolve from address: %w", err))
|
|
continue
|
|
}
|
|
}
|
|
|
|
attachments, err := resolveOutboxAttachments(ctx, p.attachmentLoader, attachmentsJSON)
|
|
if err != nil {
|
|
p.logger.Error("resolve outbox attachments", "outbox_id", id, "error", err)
|
|
p.markSendFailure(ctx, id, retryCount, err)
|
|
continue
|
|
}
|
|
|
|
req := &SendRequest{
|
|
AccountID: accountID,
|
|
From: fromEmail,
|
|
To: to,
|
|
Cc: cc,
|
|
Bcc: bcc,
|
|
Subject: subject,
|
|
BodyText: bodyText,
|
|
BodyHTML: bodyHTML,
|
|
InReplyTo: inReplyTo,
|
|
References: references,
|
|
Attachments: attachments,
|
|
}
|
|
|
|
if err := p.sender.Send(ctx, req); err != nil {
|
|
p.logger.Error("send failed", "outbox_id", id, "error", err)
|
|
observability.IncOutboxProcessed("error")
|
|
p.markSendFailure(ctx, id, retryCount, err)
|
|
} else {
|
|
observability.IncOutboxProcessed("success")
|
|
if err := p.persistSentCopyAndMarkSent(ctx, id, accountID, req, attachmentsJSON); err != nil {
|
|
p.logger.Error("persist sent copy failed", "outbox_id", id, "error", err)
|
|
p.markSentOnly(ctx, id)
|
|
}
|
|
}
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
p.logger.Error("iterate outbox rows", "error", err)
|
|
}
|
|
}
|
|
|
|
func (p *OutboxProcessor) processScheduled(ctx context.Context) {
|
|
result, err := p.db.Exec(ctx, `
|
|
UPDATE outbox SET status = 'queued', updated_at = NOW()
|
|
WHERE status = 'scheduled' AND scheduled_at IS NOT NULL AND scheduled_at <= NOW()
|
|
`)
|
|
if err != nil {
|
|
p.logger.Error("failed to process scheduled", "error", err)
|
|
return
|
|
}
|
|
if n := result.RowsAffected(); n > 0 {
|
|
p.logger.Info("promoted scheduled outbox rows", "count", n)
|
|
}
|
|
}
|
|
|
|
func (p *OutboxProcessor) updateQueueDepth(ctx context.Context) {
|
|
var count int64
|
|
err := p.db.QueryRow(ctx, `
|
|
SELECT COUNT(*) FROM outbox
|
|
WHERE status IN ('queued', 'scheduled', 'sending')
|
|
`).Scan(&count)
|
|
if err != nil {
|
|
p.logger.Error("failed to compute outbox queue depth", "error", err)
|
|
return
|
|
}
|
|
observability.SetOutboxQueueDepth(count)
|
|
}
|
|
|
|
func (p *OutboxProcessor) markSendFailure(ctx context.Context, outboxID string, retryCount int, sendErr error) {
|
|
newRetry := retryCount + 1
|
|
status := "queued"
|
|
var nextRetry any = time.Now().Add(OutboxRetryDelay(retryCount))
|
|
if newRetry >= p.maxRetries {
|
|
status = "failed"
|
|
nextRetry = nil
|
|
if err := p.recordDeadLetter(ctx, outboxID, newRetry, sendErr); err != nil {
|
|
p.logger.Error("failed to write dead-letter entry", "outbox_id", outboxID, "error", err)
|
|
}
|
|
}
|
|
if _, err := p.db.Exec(ctx, `
|
|
UPDATE outbox SET
|
|
status = $2,
|
|
retry_count = $3,
|
|
next_retry_at = $4,
|
|
error = $5,
|
|
updated_at = NOW()
|
|
WHERE id = $1
|
|
`, outboxID, status, newRetry, nextRetry, sendErr.Error()); err != nil {
|
|
p.logger.Error("failed to mark outbox retry", "outbox_id", outboxID, "error", err)
|
|
}
|
|
}
|
|
|
|
func (p *OutboxProcessor) recordDeadLetter(ctx context.Context, outboxID string, attempt int, sendErr error) error {
|
|
_, err := p.db.Exec(ctx, `
|
|
INSERT INTO outbox_dead_letters (outbox_id, attempt_count, error)
|
|
VALUES ($1, $2, $3)
|
|
`, outboxID, attempt, sendErr.Error())
|
|
return err
|
|
}
|
|
|
|
func (p *OutboxProcessor) markSentOnly(ctx context.Context, outboxID string) {
|
|
if _, err := p.db.Exec(ctx, `
|
|
UPDATE outbox SET status = 'sent', sent_at = NOW(), error = '', updated_at = NOW()
|
|
WHERE id = $1
|
|
`, outboxID); err != nil {
|
|
p.logger.Error("failed to mark outbox sent", "outbox_id", outboxID, "error", err)
|
|
}
|
|
}
|
|
|
|
func (p *OutboxProcessor) persistSentCopyAndMarkSent(
|
|
ctx context.Context,
|
|
outboxID, accountID string,
|
|
req *SendRequest,
|
|
attachmentsJSON []byte,
|
|
) error {
|
|
tx, err := p.db.BeginTx(ctx, pgx.TxOptions{})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer tx.Rollback(ctx)
|
|
|
|
folderID, err := ensureSentFolder(ctx, tx, accountID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
uid, err := nextSyntheticUID(ctx, tx, folderID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
now := time.Now()
|
|
fromJSON := marshalAddressesJSON([]string{req.From})
|
|
toJSON := marshalAddressesJSON(req.To)
|
|
ccJSON := marshalAddressesJSON(req.Cc)
|
|
bccJSON := marshalAddressesJSON(req.Bcc)
|
|
snippet := snippetFromBodies(req.BodyText, req.BodyHTML)
|
|
msgID := generateMessageID(req.From)
|
|
|
|
var messageRowID string
|
|
err = tx.QueryRow(ctx, `
|
|
INSERT INTO messages (
|
|
account_id, folder_id, uid, message_id, subject,
|
|
from_addr, to_addrs, cc_addrs, bcc_addrs,
|
|
date, snippet, body_text, body_html, has_attachments,
|
|
in_reply_to, references_header
|
|
)
|
|
VALUES ($1, $2, $3, $4, $5, $6::jsonb, $7::jsonb, $8::jsonb, $9::jsonb, $10, $11, $12, $13, $14, $15, $16)
|
|
RETURNING id
|
|
`, accountID, folderID, uid, msgID, req.Subject, fromJSON, toJSON, ccJSON, bccJSON,
|
|
now, snippet, req.BodyText, req.BodyHTML, len(req.Attachments) > 0, req.InReplyTo, req.References,
|
|
).Scan(&messageRowID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
refs, err := parseOutboxAttachmentsJSON(attachmentsJSON)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, ref := range refs {
|
|
if _, err := tx.Exec(ctx, `
|
|
INSERT INTO attachments (
|
|
message_id, filename, content_type, size, s3_bucket, s3_key, content_id, is_inline
|
|
)
|
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
|
|
`, messageRowID, ref.Filename, ref.ContentType, ref.Size, ref.S3Bucket, ref.S3Key, ref.ContentID, ref.IsInline); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if _, err := tx.Exec(ctx, `
|
|
UPDATE outbox SET status = 'sent', sent_at = NOW(), error = '', updated_at = NOW()
|
|
WHERE id = $1
|
|
`, outboxID); err != nil {
|
|
return err
|
|
}
|
|
return tx.Commit(ctx)
|
|
}
|
|
|
|
func ensureSentFolder(ctx context.Context, tx pgx.Tx, accountID string) (string, error) {
|
|
var folderID string
|
|
err := tx.QueryRow(ctx, `
|
|
SELECT id
|
|
FROM mail_folders
|
|
WHERE account_id = $1
|
|
AND (folder_type = 'sent' OR lower(name) = 'sent' OR lower(remote_name) = 'sent')
|
|
ORDER BY created_at ASC
|
|
LIMIT 1
|
|
`, accountID).Scan(&folderID)
|
|
if err == nil {
|
|
return folderID, nil
|
|
}
|
|
if !errors.Is(err, pgx.ErrNoRows) {
|
|
return "", err
|
|
}
|
|
err = tx.QueryRow(ctx, `
|
|
INSERT INTO mail_folders (account_id, name, remote_name, folder_type, created_at, updated_at)
|
|
VALUES ($1, 'Sent', 'Sent', 'sent', NOW(), NOW())
|
|
ON CONFLICT (account_id, remote_name) DO UPDATE SET updated_at = NOW()
|
|
RETURNING id
|
|
`, accountID).Scan(&folderID)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
return folderID, nil
|
|
}
|
|
|
|
func nextSyntheticUID(ctx context.Context, tx pgx.Tx, folderID string) (int64, error) {
|
|
var uid int64
|
|
err := tx.QueryRow(ctx, `
|
|
SELECT COALESCE(MAX(uid), 0) + 1
|
|
FROM messages
|
|
WHERE folder_id = $1
|
|
`, folderID).Scan(&uid)
|
|
return uid, err
|
|
}
|
|
|
|
func snippetFromBodies(bodyText, bodyHTML string) string {
|
|
text := strings.TrimSpace(bodyText)
|
|
if text == "" {
|
|
text = strings.TrimSpace(bodyHTML)
|
|
}
|
|
if len(text) > 200 {
|
|
return text[:200]
|
|
}
|
|
return text
|
|
}
|
|
|
|
func marshalAddressesJSON(addrs []string) []byte {
|
|
items := make([]map[string]string, 0, len(addrs))
|
|
for _, addr := range addrs {
|
|
trimmed := strings.TrimSpace(addr)
|
|
if trimmed == "" {
|
|
continue
|
|
}
|
|
items = append(items, map[string]string{"address": trimmed})
|
|
}
|
|
payload, err := json.Marshal(items)
|
|
if err != nil {
|
|
return []byte("[]")
|
|
}
|
|
return payload
|
|
}
|
|
|
|
func parseJSONAddresses(data []byte) []string {
|
|
var addrs []struct {
|
|
Address string `json:"address"`
|
|
}
|
|
if err := json.Unmarshal(data, &addrs); err != nil {
|
|
// Try as plain string array
|
|
var plain []string
|
|
if err := json.Unmarshal(data, &plain); err == nil {
|
|
return plain
|
|
}
|
|
return nil
|
|
}
|
|
result := make([]string, 0, len(addrs))
|
|
for _, a := range addrs {
|
|
result = append(result, a.Address)
|
|
}
|
|
return result
|
|
}
|