ultisuite-backend/internal/migration/drive_import.go
R3D347HR4Y 951c88b1ca
Some checks are pending
CI / Go tests (push) Waiting to run
CI / Integration tests (push) Waiting to run
CI / DB migrations (push) Waiting to run
feat(migration): graph childFolders, parent FK, B2B hardening
- Graph mail: discover nested childFolders, merge new folders into
  cached graphFolderQueue without breaking in-progress cursors
- Add mail_folders.parent_id (migration 000050) and wire hierarchy on import
- Shared drives: skip discovery on delta ticks, guard merge by project
- Provision: remove platform-domain email rewrite on claim
- Integration tests for nested folders, parent_id, delta childFolders mocks
2026-06-13 13:16:36 +02:00

552 lines
16 KiB
Go

package migration
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"path"
"strings"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/ultisuite/ulti-backend/internal/nextcloud"
)
const maxDriveFileBytes = 25 * 1024 * 1024
type DriveImporter struct {
db *pgxpool.Pool
nc *nextcloud.Client
client *http.Client
userUPN string
projectID string
sharedDriveMode string
sharedDedup *SharedDriveItemStore
}
func NewDriveImporter(db *pgxpool.Pool, nc *nextcloud.Client) *DriveImporter {
return &DriveImporter{db: db, nc: nc, client: migrationHTTPClient()}
}
func (d *DriveImporter) WithUserPrincipal(upn string) *DriveImporter {
d.userUPN = strings.TrimSpace(upn)
return d
}
func (d *DriveImporter) WithHTTPClient(c *http.Client) *DriveImporter {
if c != nil {
d.client = c
}
return d
}
func (d *DriveImporter) WithProject(projectID, sharedDriveMode string, dedup *SharedDriveItemStore) *DriveImporter {
d.projectID = strings.TrimSpace(projectID)
d.sharedDriveMode = NormalizeSharedDriveMode(sharedDriveMode)
d.sharedDedup = dedup
return d
}
func (d *DriveImporter) isSharedDriveDedup(driveID string, shared bool) bool {
return shared && driveID != "" && d.sharedDedup != nil
}
func (d *DriveImporter) alreadyImportedShared(driveID, sourceID string, shared bool) bool {
if !d.isSharedDriveDedup(driveID, shared) {
return false
}
return d.sharedDedup.Has(driveID, sourceID)
}
func (d *DriveImporter) markSharedImported(ctx context.Context, driveID, sourceID, relPath, jobID string, shared bool) error {
if !d.isSharedDriveDedup(driveID, shared) {
return nil
}
return d.sharedDedup.MarkImported(ctx, driveID, sourceID, relPath, jobID)
}
func (d *DriveImporter) ImportBatch(ctx context.Context, job *Job, accessToken, provider string, delta bool, update progressUpdater) error {
if d.nc == nil {
return fmt.Errorf("nextcloud required for drive migration")
}
user, err := resolveMigrationUser(ctx, d.db, job.UserID)
if err != nil {
return err
}
ncUserID := nextcloud.UserIDFromClaims(user.Email, user.ExternalID)
if _, err := d.nc.EnsurePrincipal(ctx, user.Email, user.ExternalID, user.Name); err != nil {
return fmt.Errorf("nextcloud user: %w", err)
}
root := fmt.Sprintf("/Migration/%s", provider)
_ = d.nc.CreateFolder(ctx, ncUserID, root)
store, err := LoadImportedItemStore(ctx, d.db, job.ID, job.CursorJSON)
if err != nil {
return err
}
if provider == "google" && !jsonBool(job.CursorJSON["sharedDrivesBootstrapped"]) {
// Delta-only ticks already have a change token; skip shared-drive discovery API calls.
if !(delta && d.hasDriveDeltaCursor(job, provider)) {
if err := d.bootstrapSharedDrives(ctx, job, accessToken); err != nil {
return err
}
}
job.CursorJSON["sharedDrivesBootstrapped"] = true
}
if provider == "google" && strings.TrimSpace(job.ProjectID) != "" {
if err := d.mergeSharedDriveFolders(ctx, job, provider); err != nil {
return err
}
}
if delta && d.hasDriveDeltaCursor(job, provider) {
return d.importDriveDelta(ctx, job, accessToken, provider, ncUserID, root, store, update)
}
imported, _ := job.StatsJSON["imported"].(float64)
skipped, _ := job.StatsJSON["skipped"].(float64)
exported, _ := job.StatsJSON["exported"].(float64)
batch := 0
queue := readDriveFolderQueue(job.CursorJSON, provider)
folderIndex := int(jsonNumber(job.CursorJSON["folderIndex"]))
if folderIndex >= len(queue) {
if delta && !d.hasDriveDeltaCursor(job, provider) {
if err := d.bootstrapDriveDelta(ctx, accessToken, provider, job.CursorJSON); err != nil {
return err
}
}
job.StatsJSON["imported"] = imported
job.StatsJSON["skipped"] = skipped
job.StatsJSON["exported"] = exported
if delta && d.hasDriveDeltaCursor(job, provider) {
job.StatsJSON["phase"] = "delta_ready"
} else {
job.StatsJSON["phase"] = "imported"
}
return update("completed", job.CursorJSON, job.StatsJSON, "")
}
current := queue[folderIndex]
folderItems, nextCursor, subfolders, err := d.listDriveFolderItems(ctx, accessToken, provider, current, job.CursorJSON)
if err != nil {
return err
}
listIndex := int(jsonNumber(job.CursorJSON["listIndex"]))
for i := listIndex; i < len(folderItems) && batch < driveImportBatchSize(); i++ {
item := folderItems[i]
if alreadyImported(store, item.ID) {
continue
}
if d.alreadyImportedShared(current.DriveID, item.ID, current.Shared) {
skipped++
if err := store.MarkSkipped(ctx, item.ID, "dedup: shared drive file already imported by project", relPathForItem(current, item)); err != nil {
return err
}
batch++
continue
}
relPath := path.Join(current.Path, sanitizeDrivePath(item.Name))
targetPath := path.Join(root, relPath)
if item.IsFolder {
if err := d.nc.CreateFolder(ctx, ncUserID, targetPath); err != nil {
if markErr := store.MarkFailed(ctx, item.ID, err.Error(), relPath); markErr != nil {
return markErr
}
incJobStat(job.StatsJSON, "failed")
batch++
continue
}
if err := store.MarkPath(ctx, item.ID, relPath); err != nil {
return err
}
queue = enqueueDriveFolder(queue, driveFolderRef{
ID: item.ID, Path: relPath, DriveID: current.DriveID, Shared: current.Shared,
})
} else {
if item.Export {
content, contentType, fileName, err := d.downloadGoogleExport(ctx, accessToken, item)
if err != nil {
skipped++
if err := store.MarkSkipped(ctx, item.ID, "export: "+err.Error(), relPath); err != nil {
return err
}
batch++
continue
}
targetPath = path.Join(path.Dir(targetPath), fileName)
relPath = path.Join(path.Dir(relPath), fileName)
if err := d.uploadToNextcloud(ctx, ncUserID, targetPath, content, contentType, 0); err != nil {
if markErr := store.MarkFailed(ctx, item.ID, err.Error(), relPath); markErr != nil {
return markErr
}
incJobStat(job.StatsJSON, "failed")
batch++
continue
}
exported++
if pdfMime, pdfExt, ok := googleSlidesPDFExport(item.MimeType); ok {
pdfItem := item
pdfItem.ExportMime = pdfMime
pdfItem.ExportExt = pdfExt
if pdfContent, pdfType, pdfName, err := d.downloadGoogleExport(ctx, accessToken, pdfItem); err == nil {
pdfRel := path.Join(path.Dir(relPath), pdfName)
pdfTarget := path.Join(root, pdfRel)
if err := d.nc.Upload(ctx, ncUserID, pdfTarget, pdfContent, pdfType); err == nil {
if err := store.MarkPath(ctx, item.ID+"_pdf", pdfRel); err != nil {
return err
}
}
}
}
} else {
content, contentType, err := d.downloadDriveFile(ctx, accessToken, item)
if err != nil {
if markErr := store.MarkFailed(ctx, item.ID, err.Error(), relPath); markErr != nil {
return markErr
}
incJobStat(job.StatsJSON, "failed")
batch++
continue
}
if err := d.uploadToNextcloud(ctx, ncUserID, targetPath, content, contentType, item.Size); err != nil {
if markErr := store.MarkFailed(ctx, item.ID, err.Error(), relPath); markErr != nil {
return markErr
}
incJobStat(job.StatsJSON, "failed")
batch++
continue
}
}
}
if err := store.MarkImported(ctx, item.ID); err != nil {
return err
}
if !item.IsFolder {
if err := store.MarkPath(ctx, item.ID, relPath); err != nil {
return err
}
if err := d.markSharedImported(ctx, current.DriveID, item.ID, relPath, job.ID, current.Shared); err != nil {
return err
}
}
imported++
batch++
}
for _, sub := range subfolders {
relPath := path.Join(current.Path, sanitizeDrivePath(sub.Name))
queue = enqueueDriveFolder(queue, driveFolderRef{
ID: sub.ID, Path: relPath, DriveID: current.DriveID, Shared: current.Shared,
})
}
writeDriveFolderQueue(job.CursorJSON, queue)
job.StatsJSON["imported"] = imported
job.StatsJSON["skipped"] = skipped
job.StatsJSON["exported"] = exported
if listIndex+batch < len(folderItems) {
job.CursorJSON["listIndex"] = float64(listIndex + batch)
return update("pending", job.CursorJSON, job.StatsJSON, "")
}
delete(job.CursorJSON, "listIndex")
if nextCursor != "" {
if provider == "google" {
job.CursorJSON["pageToken"] = nextCursor
} else {
job.CursorJSON["nextLink"] = nextCursor
}
return update("pending", job.CursorJSON, job.StatsJSON, "")
}
delete(job.CursorJSON, "pageToken")
delete(job.CursorJSON, "nextLink")
job.CursorJSON["folderIndex"] = float64(folderIndex + 1)
delete(job.CursorJSON, "listIndex")
return update("pending", job.CursorJSON, job.StatsJSON, "")
}
type driveItem struct {
ID string
Name string
ParentID string
IsFolder bool
Size int64
MimeType string
Download string
Export bool
ExportMime string
ExportExt string
DriveID string
}
type driveSubfolder struct {
ID string
Name string
}
func (d *DriveImporter) listDriveFolderItems(ctx context.Context, accessToken, provider string, folder driveFolderRef, cursor map[string]any) ([]driveItem, string, []driveSubfolder, error) {
switch provider {
case "google":
pageToken, _ := cursor["pageToken"].(string)
q := url.QueryEscape("'" + folder.ID + "' in parents and trashed=false")
listURL := "https://www.googleapis.com/drive/v3/files?pageSize=100&fields=nextPageToken,files(id,name,mimeType,size)&q=" + q
listURL += googleDriveListParams(folder)
if pageToken != "" {
listURL += "&pageToken=" + url.QueryEscape(pageToken)
}
body, err := apiGet(ctx, d.client, listURL, accessToken)
if err != nil {
return nil, "", nil, err
}
var parsed struct {
Files []struct {
ID string `json:"id"`
Name string `json:"name"`
MimeType string `json:"mimeType"`
Size string `json:"size"`
} `json:"files"`
NextPageToken string `json:"nextPageToken"`
}
if err := json.Unmarshal(body, &parsed); err != nil {
return nil, "", nil, err
}
out := make([]driveItem, 0, len(parsed.Files))
for _, f := range parsed.Files {
size := int64(0)
if f.Size != "" {
fmt.Sscan(f.Size, &size)
}
item := driveItem{
ID: f.ID,
Name: f.Name,
IsFolder: f.MimeType == "application/vnd.google-apps.folder",
Size: size,
MimeType: f.MimeType,
}
if item.IsFolder {
out = append(out, item)
continue
}
if exportMime, ext, ok := googleWorkspaceExport(f.MimeType); ok {
item.Export = true
item.ExportMime = exportMime
item.ExportExt = ext
item.Name = driveExportFileName(f.Name, ext)
} else {
item.Download = googleDriveDownloadURL(f.ID, folder.Shared)
}
out = append(out, item)
}
return out, parsed.NextPageToken, nil, nil
default:
nextLink, _ := cursor["nextLink"].(string)
var listURL string
if folder.ID == "root" {
listURL = graphMicrosoftURL(d.userUPN, "/drive/root/children?$top=100&$select=id,name,folder,file,size")
} else {
listURL = graphMicrosoftURL(d.userUPN, "/drive/items/"+url.PathEscape(folder.ID)+"/children?$top=100&$select=id,name,folder,file,size")
}
if nextLink != "" {
listURL = nextLink
}
body, err := apiGet(ctx, d.client, listURL, accessToken)
if err != nil {
return nil, "", nil, err
}
var parsed struct {
Value []struct {
ID string `json:"id"`
Name string `json:"name"`
Folder *struct {
ChildCount int `json:"childCount"`
} `json:"folder"`
File *struct {
MimeType string `json:"mimeType"`
} `json:"file"`
Size int64 `json:"size"`
} `json:"value"`
NextLink string `json:"@odata.nextLink"`
}
if err := json.Unmarshal(body, &parsed); err != nil {
return nil, "", nil, err
}
out := make([]driveItem, 0, len(parsed.Value))
var subs []driveSubfolder
for _, f := range parsed.Value {
if f.Folder != nil {
out = append(out, driveItem{ID: f.ID, Name: f.Name, IsFolder: true})
if f.Folder.ChildCount > 0 {
subs = append(subs, driveSubfolder{ID: f.ID, Name: f.Name})
}
continue
}
mime := ""
if f.File != nil {
mime = f.File.MimeType
}
out = append(out, driveItem{
ID: f.ID,
Name: f.Name,
Size: f.Size,
MimeType: mime,
Download: graphMicrosoftURL(d.userUPN, "/drive/items/"+url.PathEscape(f.ID)+"/content"),
})
}
return out, parsed.NextLink, subs, nil
}
}
func (d *DriveImporter) downloadDriveFile(ctx context.Context, accessToken string, item driveItem) (io.ReadCloser, string, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, item.Download, nil)
if err != nil {
return nil, "", err
}
req.Header.Set("Authorization", "Bearer "+accessToken)
resp, err := migrationDo(ctx, d.client, req)
if err != nil {
return nil, "", fmt.Errorf("download %s: %w", item.Name, err)
}
contentType := resp.Header.Get("Content-Type")
if contentType == "" {
contentType = item.MimeType
}
if contentType == "" {
contentType = "application/octet-stream"
}
return resp.Body, contentType, nil
}
func (d *DriveImporter) downloadGoogleExport(ctx context.Context, accessToken string, item driveItem) (io.ReadCloser, string, string, error) {
exportURL := fmt.Sprintf(
"https://www.googleapis.com/drive/v3/files/%s/export?mimeType=%s",
url.PathEscape(item.ID),
url.QueryEscape(item.ExportMime),
)
if item.DriveID != "" {
exportURL += "&supportsAllDrives=true"
}
req, err := http.NewRequestWithContext(ctx, http.MethodGet, exportURL, nil)
if err != nil {
return nil, "", "", err
}
req.Header.Set("Authorization", "Bearer "+accessToken)
resp, err := migrationDo(ctx, d.client, req)
if err != nil {
return nil, "", "", fmt.Errorf("export %s: %w", item.Name, err)
}
contentType := resp.Header.Get("Content-Type")
if contentType == "" {
contentType = item.ExportMime
}
return resp.Body, contentType, driveExportFileName(item.Name, item.ExportExt), nil
}
func sanitizeDrivePath(name string) string {
name = strings.TrimSpace(name)
name = strings.ReplaceAll(name, "/", "-")
name = strings.ReplaceAll(name, "\\", "-")
if name == "" {
return "untitled"
}
return name
}
func relPathForItem(folder driveFolderRef, item driveItem) string {
return path.Join(folder.Path, sanitizeDrivePath(item.Name))
}
func jsonBool(v any) bool {
switch t := v.(type) {
case bool:
return t
case float64:
return t != 0
case string:
return t == "true" || t == "1"
default:
return false
}
}
func googleDriveListParams(folder driveFolderRef) string {
if folder.Shared && folder.DriveID != "" {
return "&corpora=drive&driveId=" + url.QueryEscape(folder.DriveID) +
"&includeItemsFromAllDrives=true&supportsAllDrives=true"
}
return "&supportsAllDrives=true"
}
func googleDriveDownloadURL(fileID string, shared bool) string {
u := "https://www.googleapis.com/drive/v3/files/" + url.PathEscape(fileID) + "?alt=media"
if shared {
u += "&supportsAllDrives=true"
}
return u
}
func (d *DriveImporter) uploadToNextcloud(ctx context.Context, ncUserID, targetPath string, content io.ReadCloser, contentType string, size int64) error {
defer content.Close()
if size > maxDriveFileBytes {
return d.nc.UploadStreaming(ctx, ncUserID, targetPath, content, contentType, size)
}
return d.nc.Upload(ctx, ncUserID, targetPath, content, contentType)
}
func (d *DriveImporter) bootstrapSharedDrives(ctx context.Context, job *Job, accessToken string) error {
pageToken := ""
for {
listURL := "https://www.googleapis.com/drive/v3/drives?pageSize=100&fields=nextPageToken,drives(id,name)"
if pageToken != "" {
listURL += "&pageToken=" + url.QueryEscape(pageToken)
}
body, err := apiGet(ctx, d.client, listURL, accessToken)
if err != nil {
return err
}
var parsed struct {
Drives []struct {
ID string `json:"id"`
Name string `json:"name"`
} `json:"drives"`
NextPageToken string `json:"nextPageToken"`
}
if err := json.Unmarshal(body, &parsed); err != nil {
return err
}
for _, drive := range parsed.Drives {
if err := d.upsertDiscoveredSharedDrive(ctx, job.ProjectID, job.UserID, drive.ID, drive.Name, d.sharedDriveMode); err != nil {
return err
}
}
if parsed.NextPageToken == "" {
break
}
pageToken = parsed.NextPageToken
}
return nil
}
func (d *DriveImporter) mergeSharedDriveFolders(ctx context.Context, job *Job, provider string) error {
if provider != "google" {
return nil
}
queue := readDriveFolderQueue(job.CursorJSON, provider)
sharedFolders, err := d.loadApprovedSharedDriveFolders(ctx, job.ProjectID)
if err != nil {
return err
}
for _, folder := range sharedFolders {
queue = enqueueDriveFolder(queue, folder)
}
writeDriveFolderQueue(job.CursorJSON, queue)
return nil
}