package smtp import ( "context" "encoding/json" "log/slog" "time" "github.com/jackc/pgx/v5/pgxpool" ) type OutboxProcessor struct { db *pgxpool.Pool sender *Sender logger *slog.Logger interval time.Duration } func NewOutboxProcessor(db *pgxpool.Pool, sender *Sender, interval time.Duration) *OutboxProcessor { return &OutboxProcessor{ db: db, sender: sender, logger: slog.Default().With("component", "outbox"), interval: interval, } } func (p *OutboxProcessor) Start(ctx context.Context) { ticker := time.NewTicker(p.interval) defer ticker.Stop() p.logger.Info("outbox processor started", "interval", p.interval) for { select { case <-ctx.Done(): p.logger.Info("outbox processor stopped") return case <-ticker.C: p.processQueue(ctx) p.processScheduled(ctx) } } } func (p *OutboxProcessor) processQueue(ctx context.Context) { rows, err := p.db.Query(ctx, ` UPDATE outbox SET status = 'sending', updated_at = NOW() WHERE id IN ( SELECT id FROM outbox WHERE status = 'queued' ORDER BY created_at ASC LIMIT 10 FOR UPDATE SKIP LOCKED ) RETURNING id, account_id, to_addrs, cc_addrs, bcc_addrs, subject, body_text, body_html, in_reply_to, references_header `) if err != nil { p.logger.Error("failed to query outbox", "error", err) return } defer rows.Close() for rows.Next() { var ( id string accountID string toJSON []byte ccJSON []byte bccJSON []byte subject string bodyText string bodyHTML string inReplyTo string references []string ) if err := rows.Scan(&id, &accountID, &toJSON, &ccJSON, &bccJSON, &subject, &bodyText, &bodyHTML, &inReplyTo, &references); err != nil { p.logger.Error("scan outbox row", "error", err) continue } to := parseJSONAddresses(toJSON) cc := parseJSONAddresses(ccJSON) bcc := parseJSONAddresses(bccJSON) // Get the from address var fromEmail string _ = p.db.QueryRow(ctx, ` SELECT mi.email FROM mail_identities mi JOIN mail_accounts ma ON mi.account_id = ma.id WHERE ma.id = $1 AND mi.is_default = true LIMIT 1 `, accountID).Scan(&fromEmail) if fromEmail == "" { _ = p.db.QueryRow(ctx, `SELECT email FROM mail_accounts WHERE id = $1`, accountID).Scan(&fromEmail) } req := &SendRequest{ AccountID: accountID, From: fromEmail, To: to, Cc: cc, Bcc: bcc, Subject: subject, BodyText: bodyText, BodyHTML: bodyHTML, InReplyTo: inReplyTo, References: references, } if err := p.sender.Send(ctx, req); err != nil { p.logger.Error("send failed", "outbox_id", id, "error", err) _, _ = p.db.Exec(ctx, ` UPDATE outbox SET status = 'queued', retry_count = retry_count + 1, error = $2, updated_at = NOW() WHERE id = $1 `, id, err.Error()) } else { _, _ = p.db.Exec(ctx, ` UPDATE outbox SET status = 'sent', sent_at = NOW(), updated_at = NOW() WHERE id = $1 `, id) } } } func (p *OutboxProcessor) processScheduled(ctx context.Context) { _, err := p.db.Exec(ctx, ` UPDATE outbox SET status = 'queued', updated_at = NOW() WHERE status = 'queued' AND scheduled_at IS NOT NULL AND scheduled_at <= NOW() `) if err != nil { p.logger.Error("failed to process scheduled", "error", err) } } func parseJSONAddresses(data []byte) []string { var addrs []struct { Address string `json:"address"` } if err := json.Unmarshal(data, &addrs); err != nil { // Try as plain string array var plain []string if err := json.Unmarshal(data, &plain); err == nil { return plain } return nil } result := make([]string, 0, len(addrs)) for _, a := range addrs { result = append(result, a.Address) } return result }