package smtp import ( "context" "encoding/json" "log/slog" "time" "github.com/jackc/pgx/v5/pgxpool" "github.com/ultisuite/ulti-backend/internal/observability" ) type OutboxProcessor struct { db *pgxpool.Pool sender *Sender logger *slog.Logger interval time.Duration } func NewOutboxProcessor(db *pgxpool.Pool, sender *Sender, interval time.Duration) *OutboxProcessor { return &OutboxProcessor{ db: db, sender: sender, logger: slog.Default().With("component", "outbox"), interval: interval, } } func (p *OutboxProcessor) Start(ctx context.Context) { ticker := time.NewTicker(p.interval) defer ticker.Stop() p.logger.Info("outbox processor started", "interval", p.interval) for { select { case <-ctx.Done(): p.logger.Info("outbox processor stopped") return case <-ticker.C: p.processScheduled(ctx) p.processQueue(ctx) p.updateQueueDepth(ctx) } } } func (p *OutboxProcessor) processQueue(ctx context.Context) { rows, err := p.db.Query(ctx, ` UPDATE outbox SET status = 'sending', updated_at = NOW() WHERE id IN ( SELECT id FROM outbox WHERE status = 'queued' ORDER BY created_at ASC LIMIT 10 FOR UPDATE SKIP LOCKED ) RETURNING id, account_id, to_addrs, cc_addrs, bcc_addrs, subject, body_text, body_html, in_reply_to, references_header `) if err != nil { p.logger.Error("failed to query outbox", "error", err) return } defer rows.Close() for rows.Next() { var ( id string accountID string toJSON []byte ccJSON []byte bccJSON []byte subject string bodyText string bodyHTML string inReplyTo string references []string ) if err := rows.Scan(&id, &accountID, &toJSON, &ccJSON, &bccJSON, &subject, &bodyText, &bodyHTML, &inReplyTo, &references); err != nil { p.logger.Error("scan outbox row", "error", err) continue } to := parseJSONAddresses(toJSON) cc := parseJSONAddresses(ccJSON) bcc := parseJSONAddresses(bccJSON) // Get the from address var fromEmail string if err := p.db.QueryRow(ctx, ` SELECT mi.email FROM mail_identities mi JOIN mail_accounts ma ON mi.account_id = ma.id WHERE ma.id = $1 AND mi.is_default = true LIMIT 1 `, accountID).Scan(&fromEmail); err != nil || fromEmail == "" { if err := p.db.QueryRow(ctx, `SELECT email FROM mail_accounts WHERE id = $1`, accountID).Scan(&fromEmail); err != nil { p.logger.Error("resolve from address", "outbox_id", id, "account_id", accountID, "error", err) continue } } req := &SendRequest{ AccountID: accountID, From: fromEmail, To: to, Cc: cc, Bcc: bcc, Subject: subject, BodyText: bodyText, BodyHTML: bodyHTML, InReplyTo: inReplyTo, References: references, } if err := p.sender.Send(ctx, req); err != nil { p.logger.Error("send failed", "outbox_id", id, "error", err) observability.IncOutboxProcessed("error") if _, execErr := p.db.Exec(ctx, ` UPDATE outbox SET status = 'queued', retry_count = retry_count + 1, error = $2, updated_at = NOW() WHERE id = $1 `, id, err.Error()); execErr != nil { p.logger.Error("failed to mark outbox retry", "outbox_id", id, "error", execErr) } } else { observability.IncOutboxProcessed("success") if _, execErr := p.db.Exec(ctx, ` UPDATE outbox SET status = 'sent', sent_at = NOW(), updated_at = NOW() WHERE id = $1 `, id); execErr != nil { p.logger.Error("failed to mark outbox sent", "outbox_id", id, "error", execErr) } } } if err := rows.Err(); err != nil { p.logger.Error("iterate outbox rows", "error", err) } } func (p *OutboxProcessor) processScheduled(ctx context.Context) { result, err := p.db.Exec(ctx, ` UPDATE outbox SET status = 'queued', updated_at = NOW() WHERE status = 'scheduled' AND scheduled_at IS NOT NULL AND scheduled_at <= NOW() `) if err != nil { p.logger.Error("failed to process scheduled", "error", err) return } if n := result.RowsAffected(); n > 0 { p.logger.Info("promoted scheduled outbox rows", "count", n) } } func (p *OutboxProcessor) updateQueueDepth(ctx context.Context) { var count int64 err := p.db.QueryRow(ctx, ` SELECT COUNT(*) FROM outbox WHERE status IN ('queued', 'scheduled', 'sending') `).Scan(&count) if err != nil { p.logger.Error("failed to compute outbox queue depth", "error", err) return } observability.SetOutboxQueueDepth(count) } func parseJSONAddresses(data []byte) []string { var addrs []struct { Address string `json:"address"` } if err := json.Unmarshal(data, &addrs); err != nil { // Try as plain string array var plain []string if err := json.Unmarshal(data, &plain); err == nil { return plain } return nil } result := make([]string, 0, len(addrs)) for _, a := range addrs { result = append(result, a.Address) } return result }