ultisuite-backend/internal/mail/imap/sync.go
R3D347HR4Y 2057ccd816 Add observability features with Prometheus and Grafana integration
- Introduced health checks for Nextcloud, Immich, and Jitsi in the .env.example file.
- Implemented Prometheus metrics for HTTP requests, IMAP sync, outbox processing, and webhook executions.
- Added Grafana configuration files for dashboards and data sources.
- Updated Docker Compose to include Prometheus and Grafana services.
- Enhanced logging middleware to include request IDs and metrics tracking.
- Created health checker for monitoring database and external service statuses.
- Updated README with observability setup instructions and service URLs.
2026-05-22 16:17:10 +02:00

299 lines
7.4 KiB
Go

package imap
import (
"context"
"errors"
"fmt"
"log/slog"
"strings"
"time"
"github.com/emersion/go-imap/v2"
"github.com/emersion/go-imap/v2/imapclient"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/ultisuite/ulti-backend/internal/mail/credentials"
"github.com/ultisuite/ulti-backend/internal/observability"
)
type SyncWorker struct {
db *pgxpool.Pool
logger *slog.Logger
interval time.Duration
credentials *credentials.Manager
}
func NewSyncWorker(db *pgxpool.Pool, interval time.Duration, credManager *credentials.Manager) *SyncWorker {
return &SyncWorker{
db: db,
logger: slog.Default().With("component", "imap-sync"),
interval: interval,
credentials: credManager,
}
}
func (w *SyncWorker) Start(ctx context.Context) {
ticker := time.NewTicker(w.interval)
defer ticker.Stop()
w.logger.Info("imap sync worker started", "interval", w.interval)
// Initial sync
w.runSyncCycle(ctx)
for {
select {
case <-ctx.Done():
w.logger.Info("imap sync worker stopped")
return
case <-ticker.C:
w.runSyncCycle(ctx)
}
}
}
func (w *SyncWorker) runSyncCycle(ctx context.Context) {
start := time.Now()
if err := w.syncAllAccounts(ctx); err != nil {
observability.ObserveIMAPSync("error", time.Since(start))
w.logger.Error("sync cycle failed", "error", err)
return
}
observability.ObserveIMAPSync("success", time.Since(start))
}
func (w *SyncWorker) syncAllAccounts(ctx context.Context) error {
rows, err := w.db.Query(ctx, `
SELECT id, imap_host, imap_port, imap_tls, credentials, sync_state
FROM mail_accounts
WHERE is_active = true
`)
if err != nil {
w.logger.Error("failed to query accounts", "error", err)
return err
}
defer rows.Close()
hasSyncError := false
for rows.Next() {
var (
accountID string
host string
port int
useTLS bool
creds []byte
syncState []byte
)
if err := rows.Scan(&accountID, &host, &port, &useTLS, &creds, &syncState); err != nil {
w.logger.Error("failed to scan account", "error", err)
hasSyncError = true
continue
}
if err := w.syncAccount(ctx, accountID, host, port, useTLS, creds, syncState); err != nil {
w.logger.Error("sync failed", "account_id", accountID, "error", err)
hasSyncError = true
}
}
if err := rows.Err(); err != nil {
return err
}
if hasSyncError {
return errors.New("one or more account sync failed")
}
return nil
}
func (w *SyncWorker) syncAccount(ctx context.Context, accountID, host string, port int, useTLS bool, creds, syncState []byte) error {
addr := fmt.Sprintf("%s:%d", host, port)
var client *imapclient.Client
var err error
opts := &imapclient.Options{}
if useTLS {
client, err = imapclient.DialTLS(addr, opts)
} else {
client, err = imapclient.DialStartTLS(addr, opts)
}
if err != nil {
return fmt.Errorf("dial: %w", err)
}
defer client.Close()
username, password, err := w.parseCredentials(creds)
if err != nil {
return fmt.Errorf("decrypt credentials: %w", err)
}
if err := client.Login(username, password).Wait(); err != nil {
return fmt.Errorf("login: %w", err)
}
// List mailboxes
mailboxes, err := client.List("", "*", nil).Collect()
if err != nil {
return fmt.Errorf("list: %w", err)
}
for _, mbox := range mailboxes {
if err := w.syncFolder(ctx, client, accountID, mbox.Mailbox); err != nil {
w.logger.Error("folder sync failed", "account_id", accountID, "folder", mbox.Mailbox, "error", err)
}
}
// Update last sync time
_, err = w.db.Exec(ctx, `UPDATE mail_accounts SET last_sync_at = NOW() WHERE id = $1`, accountID)
return err
}
func (w *SyncWorker) syncFolder(ctx context.Context, client *imapclient.Client, accountID, folderName string) error {
selectData, err := client.Select(folderName, nil).Wait()
if err != nil {
return fmt.Errorf("select %s: %w", folderName, err)
}
// Upsert folder record
var folderID string
err = w.db.QueryRow(ctx, `
INSERT INTO mail_folders (account_id, name, remote_name, uidvalidity, message_count)
VALUES ($1, $2, $2, $3, $4)
ON CONFLICT (account_id, remote_name) DO UPDATE
SET uidvalidity = EXCLUDED.uidvalidity,
message_count = EXCLUDED.message_count,
updated_at = NOW()
RETURNING id
`, accountID, folderName, selectData.UIDValidity, selectData.NumMessages).Scan(&folderID)
if err != nil {
return fmt.Errorf("upsert folder: %w", err)
}
if selectData.NumMessages == 0 {
return nil
}
// Get highest UID we already have for this folder
var lastUID uint32
_ = w.db.QueryRow(ctx, `
SELECT COALESCE(MAX(uid), 0) FROM messages WHERE folder_id = $1
`, folderID).Scan(&lastUID)
// Fetch messages newer than our last UID
seqSet := imap.UIDSet{}
seqSet.AddRange(imap.UID(lastUID+1), imap.UID(0)) // lastUID+1 to *
fetchOpts := &imap.FetchOptions{
UID: true,
Flags: true,
Envelope: true,
BodySection: []*imap.FetchItemBodySection{{}},
}
fetchCmd := client.Fetch(seqSet, fetchOpts)
for {
msg := fetchCmd.Next()
if msg == nil {
break
}
if err := w.processMessage(ctx, msg, accountID, folderID); err != nil {
w.logger.Error("process message failed", "folder", folderName, "error", err)
}
}
return fetchCmd.Close()
}
func (w *SyncWorker) processMessage(ctx context.Context, msg *imapclient.FetchMessageData, accountID, folderID string) error {
var envelope *imap.Envelope
var uid imap.UID
var flags []imap.Flag
var bodyContent []byte
for {
item := msg.Next()
if item == nil {
break
}
switch data := item.(type) {
case imapclient.FetchItemDataUID:
uid = data.UID
case imapclient.FetchItemDataFlags:
flags = data.Flags
case imapclient.FetchItemDataEnvelope:
envelope = data.Envelope
case imapclient.FetchItemDataBodySection:
if data.Literal == nil {
break
}
buf := make([]byte, 0, 4096)
b := make([]byte, 4096)
for {
n, readErr := data.Literal.Read(b)
buf = append(buf, b[:n]...)
if readErr != nil {
break
}
}
bodyContent = buf
}
}
if envelope == nil {
return nil
}
flagStrs := make([]string, len(flags))
for i, f := range flags {
flagStrs[i] = string(f)
}
fromAddr := addressesToJSON(envelope.From)
toAddrs := addressesToJSON(envelope.To)
ccAddrs := addressesToJSON(envelope.Cc)
bodyText, bodyHTML := parseBody(bodyContent)
snippet := truncate(bodyText, 200)
_, err := w.db.Exec(ctx, `
INSERT INTO messages (account_id, folder_id, uid, message_id, subject, from_addr, to_addrs, cc_addrs, date, snippet, body_text, body_html, flags, in_reply_to)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
ON CONFLICT (folder_id, uid) DO NOTHING
`, accountID, folderID, uid, envelope.MessageID, envelope.Subject,
fromAddr, toAddrs, ccAddrs, envelope.Date, snippet, bodyText, bodyHTML, flagStrs, strings.Join(envelope.InReplyTo, " "))
return err
}
func (w *SyncWorker) parseCredentials(creds []byte) (string, string, error) {
if len(creds) == 0 {
return "", "", errors.New("missing credentials")
}
if !credentials.IsEncrypted(creds) {
return "", "", errors.New("plaintext credentials forbidden")
}
if w.credentials == nil {
return "", "", errors.New("credential manager not configured")
}
return w.credentials.Decrypt(creds)
}
func splitBytes(data []byte, sep byte) [][]byte {
var parts [][]byte
start := 0
for i, b := range data {
if b == sep {
parts = append(parts, data[start:i])
start = i + 1
}
}
parts = append(parts, data[start:])
return parts
}
func truncate(s string, maxLen int) string {
if len(s) <= maxLen {
return s
}
return s[:maxLen]
}