ultisuite-backend/internal/migration/graph_import.go
R3D347HR4Y 951c88b1ca
Some checks are pending
CI / Go tests (push) Waiting to run
CI / Integration tests (push) Waiting to run
CI / DB migrations (push) Waiting to run
feat(migration): graph childFolders, parent FK, B2B hardening
- Graph mail: discover nested childFolders, merge new folders into
  cached graphFolderQueue without breaking in-progress cursors
- Add mail_folders.parent_id (migration 000050) and wire hierarchy on import
- Shared drives: skip discovery on delta ticks, guard merge by project
- Provision: remove platform-domain email rewrite on claim
- Integration tests for nested folders, parent_id, delta childFolders mocks
2026-06-13 13:16:36 +02:00

842 lines
23 KiB
Go

package migration
import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/url"
"sort"
"strings"
"time"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/ultisuite/ulti-backend/internal/mail/sanitize"
"github.com/ultisuite/ulti-backend/internal/mail/threading"
)
const graphMessageSelect = "id,subject,bodyPreview,body,from,toRecipients,ccRecipients,replyTo," +
"receivedDateTime,sentDateTime,parentFolderId,isRead,flag,internetMessageId,internetMessageHeaders"
type GraphImporter struct {
db *pgxpool.Pool
client *http.Client
baseURL string
userUPN string
folders map[string]graphFolderMeta
}
type graphFolderMeta struct {
RemoteName string
FolderType string
ParentGraphID string
}
func NewGraphImporter(db *pgxpool.Pool) *GraphImporter {
return &GraphImporter{
db: db,
client: &http.Client{Timeout: 90 * time.Second},
folders: map[string]graphFolderMeta{},
}
}
func (g *GraphImporter) WithHTTPClient(c *http.Client) *GraphImporter {
if c != nil {
g.client = c
}
return g
}
func (g *GraphImporter) WithUserPrincipal(upn string) *GraphImporter {
g.userUPN = strings.TrimSpace(upn)
return g
}
func (g *GraphImporter) userBase() string {
return graphUserBase(g.userUPN)
}
func (g *GraphImporter) WithBaseURL(baseURL string) *GraphImporter {
g.baseURL = strings.TrimRight(strings.TrimSpace(baseURL), "/")
return g
}
func (g *GraphImporter) graphURL(path string) string {
if g.baseURL != "" {
return g.baseURL + path
}
return "https://graph.microsoft.com" + path
}
type graphMessage struct {
ID string `json:"id"`
Subject string `json:"subject"`
BodyPreview string `json:"bodyPreview"`
Body graphBody `json:"body"`
From graphRecipient `json:"from"`
ToRecipients []graphRecipient `json:"toRecipients"`
CcRecipients []graphRecipient `json:"ccRecipients"`
ReplyTo []graphRecipient `json:"replyTo"`
ReceivedDateTime string `json:"receivedDateTime"`
SentDateTime string `json:"sentDateTime"`
ParentFolderID string `json:"parentFolderId"`
IsRead bool `json:"isRead"`
Flag graphFlag `json:"flag"`
InternetMessageID string `json:"internetMessageId"`
InternetMessageHeaders []graphHeader `json:"internetMessageHeaders"`
Removed *struct {
Reason string `json:"reason"`
} `json:"@removed"`
}
type graphBody struct {
ContentType string `json:"contentType"`
Content string `json:"content"`
}
type graphRecipient struct {
EmailAddress graphEmailAddress `json:"emailAddress"`
}
type graphEmailAddress struct {
Name string `json:"name"`
Address string `json:"address"`
}
type graphFlag struct {
FlagStatus string `json:"flagStatus"`
}
type graphHeader struct {
Name string `json:"name"`
Value string `json:"value"`
}
func (g *GraphImporter) ImportBatch(
ctx context.Context,
job *Job,
accessToken string,
delta bool,
update func(status string, cursor, stats map[string]any, jobErr string) error,
) error {
accountID, err := g.resolveMailAccountID(ctx, job.UserID)
if err != nil {
return err
}
if err := ensureDefaultMailFolders(ctx, g.db, accountID); err != nil {
return err
}
if err := g.ensureGraphFolders(ctx, accessToken); err != nil {
return err
}
if err := g.ensureGraphFolderRecords(ctx, accountID); err != nil {
return err
}
items, err := LoadImportedItemStore(ctx, g.db, job.ID, job.CursorJSON)
if err != nil {
return err
}
if delta {
if len(graphFolderDeltaLinks(job.CursorJSON)) > 0 {
return g.importFolderDelta(ctx, job, accessToken, accountID, items, update)
}
deltaLink, _ := job.CursorJSON["deltaLink"].(string)
if deltaLink != "" {
more, err := g.importDeltaPage(ctx, job, accessToken, accountID, deltaLink, items)
if err != nil {
return err
}
if more {
return update("pending", job.CursorJSON, job.StatsJSON, "")
}
return update("completed", job.CursorJSON, job.StatsJSON, "")
}
}
return g.importFullFolders(ctx, job, accessToken, accountID, items, delta, update)
}
func (g *GraphImporter) importFullFolders(
ctx context.Context,
job *Job,
accessToken, accountID string,
items *ImportedItemStore,
captureDelta bool,
update func(status string, cursor, stats map[string]any, jobErr string) error,
) error {
queue := g.folderQueue(job.CursorJSON)
folderIndex := int(jsonNumber(job.CursorJSON["folderIndex"]))
if folderIndex >= len(queue) {
if captureDelta {
if err := g.bootstrapFolderDeltaLinks(ctx, accessToken, queue, job.CursorJSON); err != nil {
return err
}
}
job.StatsJSON["phase"] = "imported"
return update("completed", job.CursorJSON, job.StatsJSON, "")
}
folderID := queue[folderIndex]
nextLink, _ := job.CursorJSON["nextLink"].(string)
var listURL string
if nextLink != "" {
listURL = nextLink
} else {
listURL = g.folderMessagesURL(folderID)
}
body, err := g.apiGet(ctx, listURL, accessToken)
if err != nil {
return err
}
var listed struct {
Value []graphMessage `json:"value"`
NextLink string `json:"@odata.nextLink"`
DeltaLink string `json:"@odata.deltaLink"`
}
if err := json.Unmarshal(body, &listed); err != nil {
return err
}
imported, _ := job.StatsJSON["imported"].(float64)
batch := 0
listIndex := int(jsonNumber(job.CursorJSON["listIndex"]))
for i := listIndex; i < len(listed.Value) && batch < mailImportBatchSize(); i++ {
msg := listed.Value[i]
if alreadyImported(items, msg.ID) {
listIndex = i + 1
continue
}
created, err := g.importOne(ctx, accountID, msg)
if err != nil {
if markErr := items.MarkFailed(ctx, msg.ID, err.Error(), ""); markErr != nil {
return markErr
}
incJobStat(job.StatsJSON, "failed")
batch++
listIndex = i + 1
continue
}
if err := items.MarkImported(ctx, msg.ID); err != nil {
return err
}
if created {
imported++
}
batch++
listIndex = i + 1
}
job.StatsJSON["imported"] = imported
job.CursorJSON["listIndex"] = float64(listIndex)
if listIndex < len(listed.Value) {
return update("pending", job.CursorJSON, job.StatsJSON, "")
}
delete(job.CursorJSON, "listIndex")
if listed.NextLink != "" {
job.CursorJSON["nextLink"] = listed.NextLink
return update("pending", job.CursorJSON, job.StatsJSON, "")
}
delete(job.CursorJSON, "nextLink")
job.CursorJSON["folderIndex"] = float64(folderIndex + 1)
return update("pending", job.CursorJSON, job.StatsJSON, "")
}
func (g *GraphImporter) importFolderDelta(
ctx context.Context,
job *Job,
accessToken, accountID string,
items *ImportedItemStore,
update func(status string, cursor, stats map[string]any, jobErr string) error,
) error {
queue := g.folderQueue(job.CursorJSON)
folderIndex := int(jsonNumber(job.CursorJSON["folderIndex"]))
if folderIndex >= len(queue) {
job.StatsJSON["phase"] = "delta"
return update("completed", job.CursorJSON, job.StatsJSON, "")
}
folderID := queue[folderIndex]
deltaLinks := graphFolderDeltaLinks(job.CursorJSON)
deltaLink := deltaLinks[folderID]
if deltaLink == "" {
deltaLink, _ = job.CursorJSON["nextLink"].(string)
}
if deltaLink == "" {
link, err := g.initFolderDeltaLink(ctx, accessToken, folderID)
if err != nil {
return err
}
deltaLink = link
}
more, err := g.importFolderDeltaPage(ctx, job, accessToken, accountID, folderID, deltaLink, items)
if err != nil {
return err
}
if more {
return update("pending", job.CursorJSON, job.StatsJSON, "")
}
delete(job.CursorJSON, "nextLink")
job.CursorJSON["folderIndex"] = float64(folderIndex + 1)
return update("pending", job.CursorJSON, job.StatsJSON, "")
}
func (g *GraphImporter) importFolderDeltaPage(
ctx context.Context,
job *Job,
accessToken, accountID, folderID, deltaLink string,
items *ImportedItemStore,
) (more bool, err error) {
body, err := g.apiGet(ctx, deltaLink, accessToken)
if err != nil {
return false, err
}
var parsed struct {
Value []graphMessage `json:"value"`
NextLink string `json:"@odata.nextLink"`
DeltaLink string `json:"@odata.deltaLink"`
}
if err := json.Unmarshal(body, &parsed); err != nil {
return false, err
}
deltaCount, _ := job.StatsJSON["delta_imported"].(float64)
deleted, _ := job.StatsJSON["delta_deleted"].(float64)
for _, msg := range parsed.Value {
if msg.Removed != nil {
if err := g.deleteByGraphID(ctx, accountID, msg.ID); err != nil {
return false, err
}
deleted++
continue
}
if alreadyImported(items, msg.ID) {
continue
}
ok, err := g.importOne(ctx, accountID, msg)
if err != nil {
if markErr := items.MarkFailed(ctx, msg.ID, err.Error(), ""); markErr != nil {
return false, markErr
}
incJobStat(job.StatsJSON, "failed")
continue
}
if err := items.MarkImported(ctx, msg.ID); err != nil {
return false, err
}
if ok {
deltaCount++
}
}
job.StatsJSON["delta_imported"] = deltaCount
job.StatsJSON["delta_deleted"] = deleted
if parsed.NextLink != "" {
setGraphFolderDeltaLink(job.CursorJSON, folderID, parsed.NextLink)
job.StatsJSON["phase"] = "delta"
return true, nil
}
if parsed.DeltaLink != "" {
setGraphFolderDeltaLink(job.CursorJSON, folderID, parsed.DeltaLink)
}
job.StatsJSON["phase"] = "delta"
return false, nil
}
func (g *GraphImporter) importDeltaPage(ctx context.Context, job *Job, accessToken, accountID, deltaLink string, items *ImportedItemStore) (more bool, err error) {
body, err := g.apiGet(ctx, deltaLink, accessToken)
if err != nil {
return false, err
}
var parsed struct {
Value []graphMessage `json:"value"`
NextLink string `json:"@odata.nextLink"`
DeltaLink string `json:"@odata.deltaLink"`
}
if err := json.Unmarshal(body, &parsed); err != nil {
return false, err
}
deltaCount, _ := job.StatsJSON["delta_imported"].(float64)
deleted, _ := job.StatsJSON["delta_deleted"].(float64)
for _, msg := range parsed.Value {
if msg.Removed != nil {
if err := g.deleteByGraphID(ctx, accountID, msg.ID); err != nil {
return false, err
}
deleted++
continue
}
if alreadyImported(items, msg.ID) {
continue
}
ok, err := g.importOne(ctx, accountID, msg)
if err != nil {
if markErr := items.MarkFailed(ctx, msg.ID, err.Error(), ""); markErr != nil {
return false, markErr
}
incJobStat(job.StatsJSON, "failed")
continue
}
if err := items.MarkImported(ctx, msg.ID); err != nil {
return false, err
}
if ok {
deltaCount++
}
}
job.StatsJSON["delta_imported"] = deltaCount
job.StatsJSON["delta_deleted"] = deleted
if parsed.NextLink != "" {
job.CursorJSON["deltaLink"] = parsed.NextLink
job.StatsJSON["phase"] = "delta"
return true, nil
}
if parsed.DeltaLink != "" {
job.CursorJSON["deltaLink"] = parsed.DeltaLink
}
job.StatsJSON["phase"] = "delta"
return false, nil
}
func (g *GraphImporter) folderMessagesURL(folderID string) string {
path := g.userBase() + "/mailFolders/" + url.PathEscape(folderID) + "/messages" +
"?$top=100&$orderby=" + url.QueryEscape("receivedDateTime desc") +
"&$select=" + graphMessageSelect
return g.graphURL(path)
}
func (g *GraphImporter) initFolderDeltaLink(ctx context.Context, accessToken, folderID string) (string, error) {
path := g.userBase() + "/mailFolders/" + url.PathEscape(folderID) + "/messages/delta?$select=id"
body, err := g.apiGet(ctx, g.graphURL(path), accessToken)
if err != nil {
return "", err
}
var parsed struct {
DeltaLink string `json:"@odata.deltaLink"`
NextLink string `json:"@odata.nextLink"`
}
if err := json.Unmarshal(body, &parsed); err != nil {
return "", err
}
if parsed.DeltaLink != "" {
return parsed.DeltaLink, nil
}
return parsed.NextLink, nil
}
func (g *GraphImporter) bootstrapFolderDeltaLinks(ctx context.Context, accessToken string, queue []string, cursor map[string]any) error {
for _, folderID := range queue {
if graphFolderDeltaLinks(cursor)[folderID] != "" {
continue
}
link, err := g.initFolderDeltaLink(ctx, accessToken, folderID)
if err != nil {
return err
}
if link != "" {
setGraphFolderDeltaLink(cursor, folderID, link)
}
}
delete(cursor, "deltaLink")
delete(cursor, "folderIndex")
return nil
}
func (g *GraphImporter) folderQueue(cursor map[string]any) []string {
ids := make([]string, 0, len(g.folders))
for id := range g.folders {
ids = append(ids, id)
}
sort.Strings(ids)
return mergeGraphFolderQueue(cursor, ids)
}
func (g *GraphImporter) importOne(ctx context.Context, accountID string, msg graphMessage) (bool, error) {
meta := g.folders[msg.ParentFolderID]
if meta.RemoteName == "" {
meta = graphFolderMeta{RemoteName: "ARCHIVE", FolderType: "archive"}
}
parentID, err := g.parentFolderDBID(ctx, accountID, meta.ParentGraphID)
if err != nil {
return false, err
}
folderID, err := ensureMailFolder(ctx, g.db, accountID, displayFolderName(meta.RemoteName, meta.FolderType), meta.RemoteName, meta.FolderType, parentID)
if err != nil {
return false, err
}
headers := indexGraphHeaders(msg.InternetMessageHeaders)
rfcID := threading.NormalizeMessageID(msg.InternetMessageID)
if rfcID == "" {
rfcID = threading.NormalizeMessageID(headers["message-id"])
}
if rfcID == "" {
rfcID = threading.NormalizeMessageID("<graph-" + msg.ID + "@ultimail.migrated>")
}
inReplyTo := threading.NormalizeMessageID(headers["in-reply-to"])
references := parseReferences(headers["references"])
bodyText, bodyHTML := extractGraphBody(msg.Body)
snippet := strings.TrimSpace(msg.BodyPreview)
if snippet == "" {
snippet = truncateRunes(bodyText, 200)
}
date := parseGraphTime(msg.ReceivedDateTime)
if date.IsZero() {
date = parseGraphTime(msg.SentDateTime)
}
if date.IsZero() {
date = time.Now().UTC()
}
fromJSON := graphRecipientJSON(msg.From)
toJSON := graphRecipientsJSON(msg.ToRecipients)
ccJSON := graphRecipientsJSON(msg.CcRecipients)
replyToJSON := graphRecipientsJSON(msg.ReplyTo)
flags := graphFlags(msg.IsRead, msg.Flag.FlagStatus)
uid := remoteMessageUID(msg.ID)
var messageID string
var existed bool
_ = g.db.QueryRow(ctx, `SELECT EXISTS(SELECT 1 FROM messages WHERE folder_id = $1 AND uid = $2)`, folderID, uid).Scan(&existed)
err = g.db.QueryRow(ctx, `
INSERT INTO messages (
account_id, folder_id, uid, message_id, subject,
from_addr, to_addrs, cc_addrs, reply_to,
date, snippet, body_text, body_html, flags, labels,
in_reply_to, references_header
)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17)
ON CONFLICT (folder_id, uid) DO UPDATE SET
message_id = EXCLUDED.message_id,
subject = EXCLUDED.subject,
from_addr = EXCLUDED.from_addr,
to_addrs = EXCLUDED.to_addrs,
cc_addrs = EXCLUDED.cc_addrs,
reply_to = EXCLUDED.reply_to,
date = EXCLUDED.date,
snippet = EXCLUDED.snippet,
body_text = EXCLUDED.body_text,
body_html = EXCLUDED.body_html,
flags = EXCLUDED.flags,
in_reply_to = EXCLUDED.in_reply_to,
references_header = EXCLUDED.references_header,
updated_at = NOW()
RETURNING id
`, accountID, folderID, uid, rfcID, msg.Subject,
fromJSON, toJSON, ccJSON, replyToJSON,
date, snippet, bodyText, sanitize.SanitizeHTML(bodyHTML), flags, []string{},
inReplyTo, references,
).Scan(&messageID)
if err != nil {
return false, err
}
if err := threading.ApplyMessageThread(ctx, g.db, accountID, messageID, rfcID, inReplyTo, references); err != nil {
return false, err
}
return !existed, nil
}
func (g *GraphImporter) deleteByGraphID(ctx context.Context, accountID, graphID string) error {
if strings.TrimSpace(graphID) == "" {
return nil
}
uid := remoteMessageUID(graphID)
_, err := g.db.Exec(ctx, `DELETE FROM messages WHERE account_id = $1::uuid AND uid = $2`, accountID, uid)
return err
}
type graphFolderEntry struct {
ID string `json:"id"`
DisplayName string `json:"displayName"`
WellKnownName string `json:"wellKnownName"`
}
type graphDiscoverEntry struct {
id string
parentRemote string
parentGraphID string
}
func (g *GraphImporter) ensureGraphFolders(ctx context.Context, accessToken string) error {
if len(g.folders) > 0 {
return nil
}
visited := map[string]struct{}{}
discover := make([]graphDiscoverEntry, 0, 16)
listURL := g.graphURL(g.userBase() + "/mailFolders?$top=100&$select=id,displayName,wellKnownName")
for listURL != "" {
entries, nextLink, err := g.listGraphMailFoldersPage(ctx, accessToken, listURL)
if err != nil {
return err
}
for _, f := range entries {
if _, ok := visited[f.ID]; ok {
continue
}
visited[f.ID] = struct{}{}
remote, ftype := graphWellKnownFolder(f.WellKnownName, f.DisplayName)
g.folders[f.ID] = graphFolderMeta{RemoteName: remote, FolderType: ftype}
discover = append(discover, graphDiscoverEntry{id: f.ID, parentRemote: remote})
}
listURL = nextLink
}
for i := 0; i < len(discover); i++ {
entry := discover[i]
childDiscover, err := g.discoverGraphChildFolders(ctx, accessToken, entry.id, entry.parentRemote, visited)
if err != nil {
return err
}
discover = append(discover, childDiscover...)
}
return nil
}
func (g *GraphImporter) listGraphMailFoldersPage(ctx context.Context, accessToken, listURL string) ([]graphFolderEntry, string, error) {
body, err := g.apiGet(ctx, listURL, accessToken)
if err != nil {
return nil, "", err
}
var parsed struct {
Value []graphFolderEntry `json:"value"`
NextLink string `json:"@odata.nextLink"`
}
if err := json.Unmarshal(body, &parsed); err != nil {
return nil, "", err
}
return parsed.Value, parsed.NextLink, nil
}
func (g *GraphImporter) discoverGraphChildFolders(
ctx context.Context,
accessToken, parentID, parentRemote string,
visited map[string]struct{},
) ([]graphDiscoverEntry, error) {
out := make([]graphDiscoverEntry, 0, 8)
listURL := g.graphURL(g.userBase() + "/mailFolders/" + url.PathEscape(parentID) +
"/childFolders?$top=100&$select=id,displayName,wellKnownName")
for listURL != "" {
entries, nextLink, err := g.listGraphMailFoldersPage(ctx, accessToken, listURL)
if err != nil {
return nil, err
}
for _, f := range entries {
if _, ok := visited[f.ID]; ok {
continue
}
visited[f.ID] = struct{}{}
remote, ftype := graphNestedFolderMeta(parentRemote, f.WellKnownName, f.DisplayName)
g.folders[f.ID] = graphFolderMeta{RemoteName: remote, FolderType: ftype, ParentGraphID: parentID}
out = append(out, graphDiscoverEntry{id: f.ID, parentRemote: remote, parentGraphID: parentID})
}
listURL = nextLink
}
return out, nil
}
func (g *GraphImporter) ensureGraphFolderRecords(ctx context.Context, accountID string) error {
type folderEntry struct {
graphID string
meta graphFolderMeta
depth int
}
entries := make([]folderEntry, 0, len(g.folders))
for graphID, meta := range g.folders {
depth := 0
if meta.RemoteName != "" {
depth = strings.Count(meta.RemoteName, "/")
}
entries = append(entries, folderEntry{graphID: graphID, meta: meta, depth: depth})
}
sort.Slice(entries, func(i, j int) bool {
if entries[i].depth != entries[j].depth {
return entries[i].depth < entries[j].depth
}
return entries[i].meta.RemoteName < entries[j].meta.RemoteName
})
graphToDB := make(map[string]string, len(entries))
for _, entry := range entries {
var parentDB *string
if entry.meta.ParentGraphID != "" {
if pid, ok := graphToDB[entry.meta.ParentGraphID]; ok {
parentDB = &pid
}
}
dbID, err := ensureMailFolder(
ctx, g.db, accountID,
displayFolderName(entry.meta.RemoteName, entry.meta.FolderType),
entry.meta.RemoteName, entry.meta.FolderType,
parentDB,
)
if err != nil {
return err
}
graphToDB[entry.graphID] = dbID
}
return nil
}
func (g *GraphImporter) parentFolderDBID(ctx context.Context, accountID, parentGraphID string) (*string, error) {
if parentGraphID == "" {
return nil, nil
}
meta, ok := g.folders[parentGraphID]
if !ok || meta.RemoteName == "" {
return nil, nil
}
var id string
err := g.db.QueryRow(ctx, `
SELECT id::text FROM mail_folders WHERE account_id = $1::uuid AND remote_name = $2
`, accountID, meta.RemoteName).Scan(&id)
if err != nil {
return nil, err
}
return &id, nil
}
func graphNestedFolderMeta(parentRemote, wellKnown, displayName string) (remoteName, folderType string) {
if strings.TrimSpace(wellKnown) != "" {
remote, ftype := graphWellKnownFolder(wellKnown, displayName)
if parentRemote != "" {
return parentRemote + "/" + remote, ftype
}
return remote, ftype
}
segment := graphCustomFolderSegment(displayName)
if parentRemote == "" {
return segment, "custom"
}
return parentRemote + "/" + segment, "custom"
}
func graphCustomFolderSegment(displayName string) string {
name := strings.TrimSpace(displayName)
if name == "" {
return "CUSTOM"
}
return strings.ToUpper(strings.ReplaceAll(name, " ", "_"))
}
func graphWellKnownFolder(wellKnown, displayName string) (remoteName, folderType string) {
switch strings.ToLower(strings.TrimSpace(wellKnown)) {
case "inbox":
return "INBOX", "inbox"
case "sentitems":
return "SENT", "sent"
case "drafts":
return "DRAFT", "drafts"
case "deleteditems":
return "TRASH", "trash"
case "junkemail":
return "SPAM", "spam"
case "archive":
return "ARCHIVE", "archive"
default:
return graphCustomFolderSegment(displayName), "custom"
}
}
func graphFlags(isRead bool, flagStatus string) []string {
flags := []string{}
if isRead {
flags = append(flags, "\\Seen")
}
if strings.EqualFold(flagStatus, "flagged") {
flags = append(flags, "\\Flagged")
}
return flags
}
func extractGraphBody(body graphBody) (text, html string) {
content := body.Content
switch strings.ToLower(body.ContentType) {
case "html":
html = content
case "text":
text = content
default:
text = content
}
return text, html
}
func graphRecipientJSON(r graphRecipient) []byte {
if strings.TrimSpace(r.EmailAddress.Address) == "" {
return []byte("[]")
}
type addr struct {
Name string `json:"name,omitempty"`
Email string `json:"email"`
}
b, _ := json.Marshal([]addr{{Name: r.EmailAddress.Name, Email: strings.ToLower(r.EmailAddress.Address)}})
return b
}
func graphRecipientsJSON(recipients []graphRecipient) []byte {
type addr struct {
Name string `json:"name,omitempty"`
Email string `json:"email"`
}
out := make([]addr, 0, len(recipients))
for _, r := range recipients {
email := strings.ToLower(strings.TrimSpace(r.EmailAddress.Address))
if email == "" {
continue
}
out = append(out, addr{Name: r.EmailAddress.Name, Email: email})
}
b, _ := json.Marshal(out)
return b
}
func indexGraphHeaders(headers []graphHeader) map[string]string {
out := map[string]string{}
for _, h := range headers {
key := strings.ToLower(strings.TrimSpace(h.Name))
if key != "" && out[key] == "" {
out[key] = h.Value
}
}
return out
}
func parseGraphTime(raw string) time.Time {
raw = strings.TrimSpace(raw)
if raw == "" {
return time.Time{}
}
if t, err := time.Parse(time.RFC3339Nano, raw); err == nil {
return t.UTC()
}
if t, err := time.Parse(time.RFC3339, raw); err == nil {
return t.UTC()
}
return time.Time{}
}
func (g *GraphImporter) apiGet(ctx context.Context, url, accessToken string) ([]byte, error) {
raw, err := apiGet(ctx, g.client, url, accessToken)
if err != nil {
return nil, fmt.Errorf("graph api: %w", err)
}
return raw, nil
}
func (g *GraphImporter) resolveMailAccountID(ctx context.Context, userID string) (string, error) {
importer := NewGmailImporter(g.db)
return importer.resolveMailAccountID(ctx, userID)
}
func remoteMessageUID(remoteID string) int64 {
return gmailUID(remoteID)
}