ultisuite-backend/internal/mail/smtp/outbox.go
R3D347HR4Y 4eadb91a64 Enhance mail API with rate limiting, idempotency, and attachment management
- Added rate limiting for outbound email sends to prevent abuse, implemented in `internal/api/mail/sendguard`.
- Introduced idempotency key support for email sending to avoid duplicate submissions.
- Enhanced attachment handling with new limits and validation in `internal/api/mail/limits`.
- Updated outbox processing to include retry logic and circuit breaker for SMTP failures.
- Improved HTML sanitization for email content to enhance security.
- Added unit tests for new features, ensuring robust functionality and error handling.
- Updated configuration options in `.env.example` for new mail settings.
2026-05-22 17:19:16 +02:00

207 lines
5.3 KiB
Go

package smtp
import (
"context"
"encoding/json"
"log/slog"
"time"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/ultisuite/ulti-backend/internal/observability"
)
type OutboxSender interface {
Send(ctx context.Context, req *SendRequest) error
}
type OutboxProcessor struct {
db *pgxpool.Pool
sender OutboxSender
logger *slog.Logger
interval time.Duration
maxRetries int
}
func NewOutboxProcessor(db *pgxpool.Pool, sender OutboxSender, interval time.Duration, maxRetries int) *OutboxProcessor {
if maxRetries < 1 {
maxRetries = DefaultMaxOutboxRetries
}
return &OutboxProcessor{
db: db,
sender: sender,
logger: slog.Default().With("component", "outbox"),
interval: interval,
maxRetries: maxRetries,
}
}
func (p *OutboxProcessor) Start(ctx context.Context) {
ticker := time.NewTicker(p.interval)
defer ticker.Stop()
p.logger.Info("outbox processor started", "interval", p.interval)
for {
select {
case <-ctx.Done():
p.logger.Info("outbox processor stopped")
return
case <-ticker.C:
p.processScheduled(ctx)
p.processQueue(ctx)
p.updateQueueDepth(ctx)
}
}
}
func (p *OutboxProcessor) processQueue(ctx context.Context) {
rows, err := p.db.Query(ctx, `
UPDATE outbox SET status = 'sending', updated_at = NOW()
WHERE id IN (
SELECT id FROM outbox
WHERE status = 'queued'
AND (next_retry_at IS NULL OR next_retry_at <= NOW())
ORDER BY created_at ASC
LIMIT 10
FOR UPDATE SKIP LOCKED
)
RETURNING id, account_id, to_addrs, cc_addrs, bcc_addrs, subject, body_text, body_html, in_reply_to, references_header, retry_count
`)
if err != nil {
p.logger.Error("failed to query outbox", "error", err)
return
}
defer rows.Close()
for rows.Next() {
var (
id string
accountID string
toJSON []byte
ccJSON []byte
bccJSON []byte
subject string
bodyText string
bodyHTML string
inReplyTo string
references []string
retryCount int
)
if err := rows.Scan(&id, &accountID, &toJSON, &ccJSON, &bccJSON, &subject, &bodyText, &bodyHTML, &inReplyTo, &references, &retryCount); err != nil {
p.logger.Error("scan outbox row", "error", err)
continue
}
to := parseJSONAddresses(toJSON)
cc := parseJSONAddresses(ccJSON)
bcc := parseJSONAddresses(bccJSON)
// Get the from address
var fromEmail string
if err := p.db.QueryRow(ctx, `
SELECT mi.email FROM mail_identities mi
JOIN mail_accounts ma ON mi.account_id = ma.id
WHERE ma.id = $1 AND mi.is_default = true
LIMIT 1
`, accountID).Scan(&fromEmail); err != nil || fromEmail == "" {
if err := p.db.QueryRow(ctx, `SELECT email FROM mail_accounts WHERE id = $1`, accountID).Scan(&fromEmail); err != nil {
p.logger.Error("resolve from address", "outbox_id", id, "account_id", accountID, "error", err)
continue
}
}
req := &SendRequest{
AccountID: accountID,
From: fromEmail,
To: to,
Cc: cc,
Bcc: bcc,
Subject: subject,
BodyText: bodyText,
BodyHTML: bodyHTML,
InReplyTo: inReplyTo,
References: references,
}
if err := p.sender.Send(ctx, req); err != nil {
p.logger.Error("send failed", "outbox_id", id, "error", err)
observability.IncOutboxProcessed("error")
nextRetry := time.Now().Add(OutboxRetryDelay(retryCount))
newRetry := retryCount + 1
status := "queued"
if newRetry >= p.maxRetries {
status = "failed"
}
if _, execErr := p.db.Exec(ctx, `
UPDATE outbox SET
status = $2,
retry_count = $3,
next_retry_at = $4,
error = $5,
updated_at = NOW()
WHERE id = $1
`, id, status, newRetry, nextRetry, err.Error()); execErr != nil {
p.logger.Error("failed to mark outbox retry", "outbox_id", id, "error", execErr)
}
} else {
observability.IncOutboxProcessed("success")
if _, execErr := p.db.Exec(ctx, `
UPDATE outbox SET status = 'sent', sent_at = NOW(), updated_at = NOW()
WHERE id = $1
`, id); execErr != nil {
p.logger.Error("failed to mark outbox sent", "outbox_id", id, "error", execErr)
}
}
}
if err := rows.Err(); err != nil {
p.logger.Error("iterate outbox rows", "error", err)
}
}
func (p *OutboxProcessor) processScheduled(ctx context.Context) {
result, err := p.db.Exec(ctx, `
UPDATE outbox SET status = 'queued', updated_at = NOW()
WHERE status = 'scheduled' AND scheduled_at IS NOT NULL AND scheduled_at <= NOW()
`)
if err != nil {
p.logger.Error("failed to process scheduled", "error", err)
return
}
if n := result.RowsAffected(); n > 0 {
p.logger.Info("promoted scheduled outbox rows", "count", n)
}
}
func (p *OutboxProcessor) updateQueueDepth(ctx context.Context) {
var count int64
err := p.db.QueryRow(ctx, `
SELECT COUNT(*) FROM outbox
WHERE status IN ('queued', 'scheduled', 'sending')
`).Scan(&count)
if err != nil {
p.logger.Error("failed to compute outbox queue depth", "error", err)
return
}
observability.SetOutboxQueueDepth(count)
}
func parseJSONAddresses(data []byte) []string {
var addrs []struct {
Address string `json:"address"`
}
if err := json.Unmarshal(data, &addrs); err != nil {
// Try as plain string array
var plain []string
if err := json.Unmarshal(data, &plain); err == nil {
return plain
}
return nil
}
result := make([]string, 0, len(addrs))
for _, a := range addrs {
result = append(result, a.Address)
}
return result
}