ultisuite-backend/internal/migration/gmail_import.go
R3D347HR4Y 951c88b1ca
Some checks are pending
CI / Go tests (push) Waiting to run
CI / Integration tests (push) Waiting to run
CI / DB migrations (push) Waiting to run
feat(migration): graph childFolders, parent FK, B2B hardening
- Graph mail: discover nested childFolders, merge new folders into
  cached graphFolderQueue without breaking in-progress cursors
- Add mail_folders.parent_id (migration 000050) and wire hierarchy on import
- Shared drives: skip discovery on delta ticks, guard merge by project
- Provision: remove platform-domain email rewrite on claim
- Integration tests for nested folders, parent_id, delta childFolders mocks
2026-06-13 13:16:36 +02:00

707 lines
19 KiB
Go

package migration
import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"hash/fnv"
"net/http"
"strings"
"time"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/ultisuite/ulti-backend/internal/mail/sanitize"
"github.com/ultisuite/ulti-backend/internal/mail/storage"
"github.com/ultisuite/ulti-backend/internal/mail/threading"
)
type GmailImporter struct {
db *pgxpool.Pool
client *http.Client
storage *storage.Client
attachBucket string
}
func NewGmailImporter(db *pgxpool.Pool) *GmailImporter {
return &GmailImporter{
db: db,
client: &http.Client{Timeout: 90 * time.Second},
}
}
func (g *GmailImporter) WithHTTPClient(c *http.Client) *GmailImporter {
if c != nil {
g.client = c
}
return g
}
func (g *GmailImporter) WithStorage(storage *storage.Client, bucket string) *GmailImporter {
g.storage = storage
g.attachBucket = strings.TrimSpace(bucket)
return g
}
type gmailMessage struct {
ID string `json:"id"`
ThreadID string `json:"threadId"`
LabelIDs []string `json:"labelIds"`
Snippet string `json:"snippet"`
InternalDate string `json:"internalDate"`
Payload gmailPayload `json:"payload"`
}
type gmailPayload struct {
MimeType string `json:"mimeType"`
Headers []gmailHeader `json:"headers"`
Body gmailBody `json:"body"`
Parts []gmailPayload `json:"parts"`
}
type gmailHeader struct {
Name string `json:"name"`
Value string `json:"value"`
}
type gmailBody struct {
Size int `json:"size"`
Data string `json:"data"`
AttachmentID string `json:"attachmentId"`
}
func (g *GmailImporter) ImportBatch(
ctx context.Context,
job *Job,
accessToken string,
delta bool,
update func(status string, cursor, stats map[string]any, jobErr string) error,
) error {
accountID, err := g.resolveMailAccountID(ctx, job.UserID)
if err != nil {
return err
}
if err := ensureDefaultMailFolders(ctx, g.db, accountID); err != nil {
return err
}
items, err := LoadImportedItemStore(ctx, g.db, job.ID, job.CursorJSON)
if err != nil {
return err
}
if delta {
historyID, _ := job.CursorJSON["historyId"].(string)
if historyID != "" {
more, err := g.importHistory(ctx, job, accessToken, accountID, historyID, items)
if err != nil {
if isGmailHistoryNotFound(err) {
if newID, fetchErr := g.fetchHistoryID(ctx, accessToken); fetchErr == nil && newID != "" {
job.CursorJSON["historyId"] = newID
delete(job.CursorJSON, "historyPageToken")
job.StatsJSON["history_reset"] = float64(1)
job.StatsJSON["phase"] = "delta"
return update("completed", job.CursorJSON, job.StatsJSON, "")
}
}
return err
}
if more {
return update("pending", job.CursorJSON, job.StatsJSON, "")
}
return update("completed", job.CursorJSON, job.StatsJSON, "")
}
}
pageToken, _ := job.CursorJSON["pageToken"].(string)
listIndex := int(jsonNumber(job.CursorJSON["listIndex"]))
listURL := "https://gmail.googleapis.com/gmail/v1/users/me/messages?maxResults=100"
if pageToken != "" {
listURL += "&pageToken=" + pageToken
}
body, err := g.apiGet(ctx, listURL, accessToken)
if err != nil {
return err
}
var listed struct {
Messages []struct{ ID string `json:"id"` } `json:"messages"`
NextPageToken string `json:"nextPageToken"`
ResultSizeEstimate int `json:"resultSizeEstimate"`
}
if err := json.Unmarshal(body, &listed); err != nil {
return err
}
if listed.ResultSizeEstimate > 0 {
job.StatsJSON["estimated_total"] = float64(listed.ResultSizeEstimate)
}
imported, _ := job.StatsJSON["imported"].(float64)
batch := 0
for i := listIndex; i < len(listed.Messages) && batch < mailImportBatchSize(); i++ {
msgID := listed.Messages[i].ID
if alreadyImported(items, msgID) {
listIndex = i + 1
continue
}
created, err := g.importOne(ctx, accessToken, job.UserID, accountID, msgID)
if err != nil {
if markErr := items.MarkFailed(ctx, msgID, err.Error(), ""); markErr != nil {
return markErr
}
incJobStat(job.StatsJSON, "failed")
batch++
listIndex = i + 1
continue
}
if err := items.MarkImported(ctx, msgID); err != nil {
return err
}
if created {
imported++
}
batch++
listIndex = i + 1
}
job.StatsJSON["imported"] = imported
job.CursorJSON["listIndex"] = float64(listIndex)
if listIndex < len(listed.Messages) {
return update("pending", job.CursorJSON, job.StatsJSON, "")
}
// page complete
delete(job.CursorJSON, "listIndex")
if listed.NextPageToken != "" {
job.CursorJSON["pageToken"] = listed.NextPageToken
return update("pending", job.CursorJSON, job.StatsJSON, "")
}
delete(job.CursorJSON, "pageToken")
if hid, err := g.fetchHistoryID(ctx, accessToken); err == nil && hid != "" {
job.CursorJSON["historyId"] = hid
}
job.StatsJSON["phase"] = "imported"
return update("completed", job.CursorJSON, job.StatsJSON, "")
}
func jsonNumber(v any) float64 {
if v == nil {
return 0
}
if f, ok := v.(float64); ok {
return f
}
return 0
}
func (g *GmailImporter) importOne(ctx context.Context, accessToken, userID, accountID, gmailID string) (bool, error) {
raw, err := g.apiGet(ctx, "https://gmail.googleapis.com/gmail/v1/users/me/messages/"+gmailID+"?format=full", accessToken)
if err != nil {
return false, err
}
var msg gmailMessage
if err := json.Unmarshal(raw, &msg); err != nil {
return false, err
}
remoteName, folderType := primaryGmailFolder(msg.LabelIDs)
folderID, err := ensureMailFolder(ctx, g.db, accountID, displayFolderName(remoteName, folderType), remoteName, folderType, nil)
if err != nil {
return false, err
}
headers := indexHeaders(msg.Payload)
subject := headers["subject"]
fromJSON := parseAddressListJSON(headers["from"])
toJSON := parseAddressListJSON(headers["to"])
ccJSON := parseAddressListJSON(headers["cc"])
replyToJSON := parseAddressListJSON(headers["reply-to"])
rfcID := threading.NormalizeMessageID(headers["message-id"])
if rfcID == "" {
rfcID = threading.NormalizeMessageID("<gmail-" + gmailID + "@ultimail.migrated>")
}
inReplyTo := threading.NormalizeMessageID(headers["in-reply-to"])
references := parseReferences(headers["references"])
bodyText, bodyHTML := extractGmailBodies(msg.Payload)
snippet := strings.TrimSpace(msg.Snippet)
if snippet == "" {
snippet = truncateRunes(bodyText, 200)
}
date := parseMailDate(headers["date"])
if msg.InternalDate != "" {
if ms, err := parseInternalDate(msg.InternalDate); err == nil {
date = ms
}
}
flags := gmailFlags(msg.LabelIDs)
labels := gmailUserLabels(msg.LabelIDs)
uid := gmailUID(gmailID)
var messageID string
var existed bool
_ = g.db.QueryRow(ctx, `SELECT EXISTS(SELECT 1 FROM messages WHERE folder_id = $1 AND uid = $2)`, folderID, uid).Scan(&existed)
err = g.db.QueryRow(ctx, `
INSERT INTO messages (
account_id, folder_id, uid, message_id, subject,
from_addr, to_addrs, cc_addrs, reply_to,
date, snippet, body_text, body_html, flags, labels,
in_reply_to, references_header
)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17)
ON CONFLICT (folder_id, uid) DO UPDATE SET
message_id = EXCLUDED.message_id,
subject = EXCLUDED.subject,
from_addr = EXCLUDED.from_addr,
to_addrs = EXCLUDED.to_addrs,
cc_addrs = EXCLUDED.cc_addrs,
reply_to = EXCLUDED.reply_to,
date = EXCLUDED.date,
snippet = EXCLUDED.snippet,
body_text = EXCLUDED.body_text,
body_html = EXCLUDED.body_html,
flags = EXCLUDED.flags,
labels = EXCLUDED.labels,
in_reply_to = EXCLUDED.in_reply_to,
references_header = EXCLUDED.references_header,
updated_at = NOW()
RETURNING id
`, accountID, folderID, uid, rfcID, subject,
fromJSON, toJSON, ccJSON, replyToJSON,
date, snippet, bodyText, sanitize.SanitizeHTML(bodyHTML), flags, labels,
inReplyTo, references,
).Scan(&messageID)
if err != nil {
return false, err
}
if err := threading.ApplyMessageThread(ctx, g.db, accountID, messageID, rfcID, inReplyTo, references); err != nil {
return false, err
}
if err := g.storeGmailAttachments(ctx, userID, messageID, gmailID, accessToken, msg.Payload, existed); err != nil {
return false, err
}
return !existed, nil
}
func (g *GmailImporter) importHistory(ctx context.Context, job *Job, accessToken, accountID, historyID string, items *ImportedItemStore) (more bool, err error) {
pageToken, _ := job.CursorJSON["historyPageToken"].(string)
listURL := fmt.Sprintf(
"https://gmail.googleapis.com/gmail/v1/users/me/history?startHistoryId=%s&maxResults=100&historyTypes=messageAdded&historyTypes=messageDeleted&historyTypes=labelAdded&historyTypes=labelRemoved",
historyID,
)
if pageToken != "" {
listURL += "&pageToken=" + pageToken
}
body, err := g.apiGet(ctx, listURL, accessToken)
if err != nil {
return false, err
}
var parsed struct {
History []struct {
MessagesAdded []struct {
Message struct{ ID string `json:"id"` } `json:"message"`
} `json:"messagesAdded"`
MessagesDeleted []struct {
Message struct{ ID string `json:"id"` } `json:"message"`
} `json:"messagesDeleted"`
LabelsAdded []struct {
Message struct{ ID string `json:"id"` } `json:"message"`
} `json:"labelsAdded"`
LabelsRemoved []struct {
Message struct{ ID string `json:"id"` } `json:"message"`
} `json:"labelsRemoved"`
} `json:"history"`
NextPageToken string `json:"nextPageToken"`
HistoryID string `json:"historyId"`
}
if err := json.Unmarshal(body, &parsed); err != nil {
return false, err
}
deltaCount, _ := job.StatsJSON["delta_imported"].(float64)
deleted, _ := job.StatsJSON["delta_deleted"].(float64)
batch := 0
listIndex := int(jsonNumber(job.CursorJSON["historyListIndex"]))
for hi := listIndex; hi < len(parsed.History) && batch < mailImportBatchSize(); hi++ {
h := parsed.History[hi]
for _, added := range h.MessagesAdded {
if batch >= mailImportBatchSize() {
job.CursorJSON["historyListIndex"] = float64(hi)
return true, nil
}
msgID := added.Message.ID
if alreadyImported(items, msgID) {
batch++
continue
}
ok, err := g.importOne(ctx, accessToken, job.UserID, accountID, msgID)
if err != nil {
if markErr := items.MarkFailed(ctx, msgID, err.Error(), ""); markErr != nil {
return false, markErr
}
incJobStat(job.StatsJSON, "failed")
batch++
continue
}
if err := items.MarkImported(ctx, msgID); err != nil {
return false, err
}
if ok {
deltaCount++
}
batch++
}
for _, removed := range h.MessagesDeleted {
if batch >= mailImportBatchSize() {
job.CursorJSON["historyListIndex"] = float64(hi)
return true, nil
}
if err := g.deleteByGmailID(ctx, accountID, removed.Message.ID); err != nil {
return false, err
}
deleted++
batch++
}
for _, labeled := range h.LabelsAdded {
if batch >= mailImportBatchSize() {
job.CursorJSON["historyListIndex"] = float64(hi)
return true, nil
}
if _, err := g.importOne(ctx, accessToken, job.UserID, accountID, labeled.Message.ID); err != nil {
return false, err
}
deltaCount++
batch++
}
for _, labeled := range h.LabelsRemoved {
if batch >= mailImportBatchSize() {
job.CursorJSON["historyListIndex"] = float64(hi)
return true, nil
}
if _, err := g.importOne(ctx, accessToken, job.UserID, accountID, labeled.Message.ID); err != nil {
return false, err
}
deltaCount++
batch++
}
}
delete(job.CursorJSON, "historyListIndex")
job.StatsJSON["delta_imported"] = deltaCount
job.StatsJSON["delta_deleted"] = deleted
if parsed.NextPageToken != "" {
job.CursorJSON["historyPageToken"] = parsed.NextPageToken
if parsed.HistoryID != "" {
job.CursorJSON["historyId"] = parsed.HistoryID
}
job.StatsJSON["phase"] = "delta"
return true, nil
}
delete(job.CursorJSON, "historyPageToken")
if parsed.HistoryID != "" {
job.CursorJSON["historyId"] = parsed.HistoryID
}
job.StatsJSON["phase"] = "delta"
return false, nil
}
func (g *GmailImporter) deleteByGmailID(ctx context.Context, accountID, gmailID string) error {
if strings.TrimSpace(gmailID) == "" {
return nil
}
uid := gmailUID(gmailID)
_, err := g.db.Exec(ctx, `DELETE FROM messages WHERE account_id = $1::uuid AND uid = $2`, accountID, uid)
return err
}
func isGmailHistoryNotFound(err error) bool {
if err == nil {
return false
}
msg := strings.ToLower(err.Error())
return strings.Contains(msg, "404") && strings.Contains(msg, "history")
}
func (g *GmailImporter) fetchHistoryID(ctx context.Context, accessToken string) (string, error) {
body, err := g.apiGet(ctx, "https://gmail.googleapis.com/gmail/v1/users/me/profile", accessToken)
if err != nil {
return "", err
}
var parsed struct {
HistoryID string `json:"historyId"`
}
if err := json.Unmarshal(body, &parsed); err != nil {
return "", err
}
return parsed.HistoryID, nil
}
func (g *GmailImporter) apiGet(ctx context.Context, url, accessToken string) ([]byte, error) {
raw, err := apiGet(ctx, g.client, url, accessToken)
if err != nil {
return nil, fmt.Errorf("gmail api: %w", err)
}
return raw, nil
}
func (g *GmailImporter) resolveMailAccountID(ctx context.Context, userID string) (string, error) {
var accountID string
err := g.db.QueryRow(ctx, `
SELECT COALESCE(
(SELECT mail_account_id::text FROM mailboxes WHERE user_id = $1::uuid AND mail_account_id IS NOT NULL LIMIT 1),
(SELECT id::text FROM mail_accounts WHERE user_id = $1::uuid AND is_active ORDER BY created_at LIMIT 1)
)
`, userID).Scan(&accountID)
if err != nil || accountID == "" {
return "", fmt.Errorf("no mail account for migration user")
}
return accountID, nil
}
func ensureDefaultMailFolders(ctx context.Context, db *pgxpool.Pool, accountID string) error {
defaults := []struct{ name, remote, ftype string }{
{"Boîte de réception", "INBOX", "inbox"},
{"Envoyés", "SENT", "sent"},
{"Brouillons", "DRAFT", "drafts"},
{"Corbeille", "TRASH", "trash"},
{"Spam", "SPAM", "spam"},
{"Archives", "ARCHIVE", "archive"},
}
for _, d := range defaults {
if _, err := ensureMailFolder(ctx, db, accountID, d.name, d.remote, d.ftype, nil); err != nil {
return err
}
}
return nil
}
func ensureMailFolder(ctx context.Context, db *pgxpool.Pool, accountID, name, remoteName, folderType string, parentID *string) (string, error) {
var folderID string
err := db.QueryRow(ctx, `
INSERT INTO mail_folders (account_id, name, remote_name, folder_type, parent_id)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (account_id, remote_name) DO UPDATE
SET name = EXCLUDED.name,
folder_type = EXCLUDED.folder_type,
parent_id = EXCLUDED.parent_id,
updated_at = NOW()
RETURNING id::text
`, accountID, name, remoteName, folderType, parentID).Scan(&folderID)
return folderID, err
}
func displayFolderName(remote, folderType string) string {
switch folderType {
case "inbox":
return "Boîte de réception"
case "sent":
return "Envoyés"
case "drafts":
return "Brouillons"
case "trash":
return "Corbeille"
case "spam":
return "Spam"
case "archive":
return "Archives"
default:
return remote
}
}
func primaryGmailFolder(labelIDs []string) (remoteName, folderType string) {
priority := []struct{ label, remote, ftype string }{
{"INBOX", "INBOX", "inbox"},
{"SENT", "SENT", "sent"},
{"DRAFT", "DRAFT", "drafts"},
{"TRASH", "TRASH", "trash"},
{"SPAM", "SPAM", "spam"},
}
set := make(map[string]struct{}, len(labelIDs))
for _, l := range labelIDs {
set[l] = struct{}{}
}
for _, p := range priority {
if _, ok := set[p.label]; ok {
return p.remote, p.ftype
}
}
return "ARCHIVE", "archive"
}
func gmailUserLabels(labelIDs []string) []string {
system := map[string]struct{}{
"INBOX": {}, "SENT": {}, "DRAFT": {}, "TRASH": {}, "SPAM": {},
"STARRED": {}, "IMPORTANT": {}, "UNREAD": {}, "CATEGORY_PERSONAL": {},
"CATEGORY_SOCIAL": {}, "CATEGORY_PROMOTIONS": {}, "CATEGORY_UPDATES": {},
"CATEGORY_FORUMS": {},
}
out := make([]string, 0, len(labelIDs))
for _, l := range labelIDs {
if _, skip := system[l]; skip {
continue
}
out = append(out, strings.ToLower(l))
}
return out
}
func gmailFlags(labelIDs []string) []string {
flags := []string{}
unread := false
for _, l := range labelIDs {
switch l {
case "UNREAD":
unread = true
case "STARRED":
flags = append(flags, "\\Flagged")
case "IMPORTANT":
flags = append(flags, "important")
}
}
if !unread {
flags = append(flags, "\\Seen")
}
return flags
}
func gmailUID(gmailID string) int64 {
h := fnv.New64a()
_, _ = h.Write([]byte(gmailID))
v := int64(h.Sum64() & 0x7fffffffffffffff)
if v == 0 {
return 1
}
return v
}
func indexHeaders(p gmailPayload) map[string]string {
out := map[string]string{}
var walk func(gmailPayload)
walk = func(node gmailPayload) {
for _, h := range node.Headers {
key := strings.ToLower(strings.TrimSpace(h.Name))
if key != "" && out[key] == "" {
out[key] = h.Value
}
}
for _, part := range node.Parts {
walk(part)
}
}
walk(p)
return out
}
func extractGmailBodies(p gmailPayload) (text, html string) {
var walk func(gmailPayload)
walk = func(node gmailPayload) {
if text == "" && node.MimeType == "text/plain" && node.Body.Data != "" {
text = decodeGmailBody(node.Body.Data)
}
if html == "" && node.MimeType == "text/html" && node.Body.Data != "" {
html = decodeGmailBody(node.Body.Data)
}
for _, part := range node.Parts {
walk(part)
}
}
walk(p)
return text, html
}
func decodeGmailBody(data string) string {
data = strings.ReplaceAll(data, "-", "+")
data = strings.ReplaceAll(data, "_", "/")
raw, err := base64.StdEncoding.DecodeString(data)
if err != nil {
return ""
}
return string(raw)
}
func parseAddressListJSON(raw string) []byte {
raw = strings.TrimSpace(raw)
if raw == "" {
return []byte("[]")
}
parts := splitAddresses(raw)
type addr struct {
Name string `json:"name,omitempty"`
Email string `json:"email"`
}
out := make([]addr, 0, len(parts))
for _, p := range parts {
name, email := parseSingleAddress(p)
out = append(out, addr{Name: name, Email: email})
}
b, _ := json.Marshal(out)
return b
}
func splitAddresses(raw string) []string {
return strings.Split(raw, ",")
}
func parseSingleAddress(raw string) (name, email string) {
raw = strings.TrimSpace(raw)
if i := strings.Index(raw, "<"); i >= 0 && strings.HasSuffix(raw, ">") {
name = strings.Trim(strings.TrimSpace(raw[:i]), `"`)
email = strings.Trim(raw[i+1:len(raw)-1], " <>")
return name, strings.ToLower(email)
}
return "", strings.ToLower(raw)
}
func parseReferences(raw string) []string {
raw = strings.TrimSpace(raw)
if raw == "" {
return []string{}
}
var out []string
for _, part := range strings.Fields(raw) {
if id := threading.NormalizeMessageID(part); id != "" {
out = append(out, id)
}
}
return out
}
func parseMailDate(raw string) time.Time {
raw = strings.TrimSpace(raw)
if raw == "" {
return time.Now().UTC()
}
layouts := []string{time.RFC1123Z, time.RFC1123, time.RFC3339}
for _, layout := range layouts {
if t, err := time.Parse(layout, raw); err == nil {
return t.UTC()
}
}
return time.Now().UTC()
}
func parseInternalDate(raw string) (time.Time, error) {
var ms int64
if _, err := fmt.Sscan(raw, &ms); err != nil {
return time.Time{}, err
}
return time.UnixMilli(ms).UTC(), nil
}
func truncateRunes(s string, n int) string {
r := []rune(strings.TrimSpace(s))
if len(r) <= n {
return string(r)
}
return string(r[:n])
}
var _ = pgx.ErrNoRows