- Added endpoints for listing and importing migration rosters. - Introduced audit export functionality for migration jobs in CSV and NDJSON formats. - Implemented tenant mismatch validation for Microsoft migration claims. - Enhanced error handling for email claiming and migration processes. - Added integration tests for roster import and claim workflows.
549 lines
16 KiB
Go
549 lines
16 KiB
Go
package migration
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"net/url"
|
|
"path"
|
|
"strings"
|
|
|
|
"github.com/jackc/pgx/v5/pgxpool"
|
|
|
|
"github.com/ultisuite/ulti-backend/internal/nextcloud"
|
|
)
|
|
|
|
const maxDriveFileBytes = 25 * 1024 * 1024
|
|
|
|
type DriveImporter struct {
|
|
db *pgxpool.Pool
|
|
nc *nextcloud.Client
|
|
client *http.Client
|
|
userUPN string
|
|
projectID string
|
|
sharedDriveMode string
|
|
sharedDedup *SharedDriveItemStore
|
|
}
|
|
|
|
func NewDriveImporter(db *pgxpool.Pool, nc *nextcloud.Client) *DriveImporter {
|
|
return &DriveImporter{db: db, nc: nc, client: migrationHTTPClient()}
|
|
}
|
|
|
|
func (d *DriveImporter) WithUserPrincipal(upn string) *DriveImporter {
|
|
d.userUPN = strings.TrimSpace(upn)
|
|
return d
|
|
}
|
|
|
|
func (d *DriveImporter) WithHTTPClient(c *http.Client) *DriveImporter {
|
|
if c != nil {
|
|
d.client = c
|
|
}
|
|
return d
|
|
}
|
|
|
|
func (d *DriveImporter) WithProject(projectID, sharedDriveMode string, dedup *SharedDriveItemStore) *DriveImporter {
|
|
d.projectID = strings.TrimSpace(projectID)
|
|
d.sharedDriveMode = NormalizeSharedDriveMode(sharedDriveMode)
|
|
d.sharedDedup = dedup
|
|
return d
|
|
}
|
|
|
|
func (d *DriveImporter) isSharedDriveDedup(driveID string, shared bool) bool {
|
|
return shared && driveID != "" && d.sharedDedup != nil
|
|
}
|
|
|
|
func (d *DriveImporter) alreadyImportedShared(driveID, sourceID string, shared bool) bool {
|
|
if !d.isSharedDriveDedup(driveID, shared) {
|
|
return false
|
|
}
|
|
return d.sharedDedup.Has(driveID, sourceID)
|
|
}
|
|
|
|
func (d *DriveImporter) markSharedImported(ctx context.Context, driveID, sourceID, relPath, jobID string, shared bool) error {
|
|
if !d.isSharedDriveDedup(driveID, shared) {
|
|
return nil
|
|
}
|
|
return d.sharedDedup.MarkImported(ctx, driveID, sourceID, relPath, jobID)
|
|
}
|
|
|
|
func (d *DriveImporter) ImportBatch(ctx context.Context, job *Job, accessToken, provider string, delta bool, update progressUpdater) error {
|
|
if d.nc == nil {
|
|
return fmt.Errorf("nextcloud required for drive migration")
|
|
}
|
|
user, err := resolveMigrationUser(ctx, d.db, job.UserID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
ncUserID := nextcloud.UserIDFromClaims(user.Email, user.ExternalID)
|
|
if _, err := d.nc.EnsurePrincipal(ctx, user.Email, user.ExternalID, user.Name); err != nil {
|
|
return fmt.Errorf("nextcloud user: %w", err)
|
|
}
|
|
root := fmt.Sprintf("/Migration/%s", provider)
|
|
_ = d.nc.CreateFolder(ctx, ncUserID, root)
|
|
store, err := LoadImportedItemStore(ctx, d.db, job.ID, job.CursorJSON)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if provider == "google" && !jsonBool(job.CursorJSON["sharedDrivesBootstrapped"]) {
|
|
if err := d.bootstrapSharedDrives(ctx, job, accessToken); err != nil {
|
|
return err
|
|
}
|
|
job.CursorJSON["sharedDrivesBootstrapped"] = true
|
|
}
|
|
if provider == "google" {
|
|
if err := d.mergeSharedDriveFolders(ctx, job, provider); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if delta && d.hasDriveDeltaCursor(job, provider) {
|
|
return d.importDriveDelta(ctx, job, accessToken, provider, ncUserID, root, store, update)
|
|
}
|
|
|
|
imported, _ := job.StatsJSON["imported"].(float64)
|
|
skipped, _ := job.StatsJSON["skipped"].(float64)
|
|
exported, _ := job.StatsJSON["exported"].(float64)
|
|
batch := 0
|
|
|
|
queue := readDriveFolderQueue(job.CursorJSON, provider)
|
|
folderIndex := int(jsonNumber(job.CursorJSON["folderIndex"]))
|
|
if folderIndex >= len(queue) {
|
|
if delta && !d.hasDriveDeltaCursor(job, provider) {
|
|
if err := d.bootstrapDriveDelta(ctx, accessToken, provider, job.CursorJSON); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
job.StatsJSON["imported"] = imported
|
|
job.StatsJSON["skipped"] = skipped
|
|
job.StatsJSON["exported"] = exported
|
|
if delta && d.hasDriveDeltaCursor(job, provider) {
|
|
job.StatsJSON["phase"] = "delta_ready"
|
|
} else {
|
|
job.StatsJSON["phase"] = "imported"
|
|
}
|
|
return update("completed", job.CursorJSON, job.StatsJSON, "")
|
|
}
|
|
|
|
current := queue[folderIndex]
|
|
folderItems, nextCursor, subfolders, err := d.listDriveFolderItems(ctx, accessToken, provider, current, job.CursorJSON)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
listIndex := int(jsonNumber(job.CursorJSON["listIndex"]))
|
|
for i := listIndex; i < len(folderItems) && batch < driveImportBatchSize(); i++ {
|
|
item := folderItems[i]
|
|
if alreadyImported(store, item.ID) {
|
|
continue
|
|
}
|
|
if d.alreadyImportedShared(current.DriveID, item.ID, current.Shared) {
|
|
skipped++
|
|
if err := store.MarkSkipped(ctx, item.ID, "dedup: shared drive file already imported by project", relPathForItem(current, item)); err != nil {
|
|
return err
|
|
}
|
|
batch++
|
|
continue
|
|
}
|
|
relPath := path.Join(current.Path, sanitizeDrivePath(item.Name))
|
|
targetPath := path.Join(root, relPath)
|
|
if item.IsFolder {
|
|
if err := d.nc.CreateFolder(ctx, ncUserID, targetPath); err != nil {
|
|
if markErr := store.MarkFailed(ctx, item.ID, err.Error(), relPath); markErr != nil {
|
|
return markErr
|
|
}
|
|
incJobStat(job.StatsJSON, "failed")
|
|
batch++
|
|
continue
|
|
}
|
|
if err := store.MarkPath(ctx, item.ID, relPath); err != nil {
|
|
return err
|
|
}
|
|
queue = enqueueDriveFolder(queue, driveFolderRef{
|
|
ID: item.ID, Path: relPath, DriveID: current.DriveID, Shared: current.Shared,
|
|
})
|
|
} else {
|
|
if item.Export {
|
|
content, contentType, fileName, err := d.downloadGoogleExport(ctx, accessToken, item)
|
|
if err != nil {
|
|
skipped++
|
|
if err := store.MarkSkipped(ctx, item.ID, "export: "+err.Error(), relPath); err != nil {
|
|
return err
|
|
}
|
|
batch++
|
|
continue
|
|
}
|
|
targetPath = path.Join(path.Dir(targetPath), fileName)
|
|
relPath = path.Join(path.Dir(relPath), fileName)
|
|
if err := d.uploadToNextcloud(ctx, ncUserID, targetPath, content, contentType, 0); err != nil {
|
|
if markErr := store.MarkFailed(ctx, item.ID, err.Error(), relPath); markErr != nil {
|
|
return markErr
|
|
}
|
|
incJobStat(job.StatsJSON, "failed")
|
|
batch++
|
|
continue
|
|
}
|
|
exported++
|
|
if pdfMime, pdfExt, ok := googleSlidesPDFExport(item.MimeType); ok {
|
|
pdfItem := item
|
|
pdfItem.ExportMime = pdfMime
|
|
pdfItem.ExportExt = pdfExt
|
|
if pdfContent, pdfType, pdfName, err := d.downloadGoogleExport(ctx, accessToken, pdfItem); err == nil {
|
|
pdfRel := path.Join(path.Dir(relPath), pdfName)
|
|
pdfTarget := path.Join(root, pdfRel)
|
|
if err := d.nc.Upload(ctx, ncUserID, pdfTarget, pdfContent, pdfType); err == nil {
|
|
if err := store.MarkPath(ctx, item.ID+"_pdf", pdfRel); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
}
|
|
} else {
|
|
content, contentType, err := d.downloadDriveFile(ctx, accessToken, item)
|
|
if err != nil {
|
|
if markErr := store.MarkFailed(ctx, item.ID, err.Error(), relPath); markErr != nil {
|
|
return markErr
|
|
}
|
|
incJobStat(job.StatsJSON, "failed")
|
|
batch++
|
|
continue
|
|
}
|
|
if err := d.uploadToNextcloud(ctx, ncUserID, targetPath, content, contentType, item.Size); err != nil {
|
|
if markErr := store.MarkFailed(ctx, item.ID, err.Error(), relPath); markErr != nil {
|
|
return markErr
|
|
}
|
|
incJobStat(job.StatsJSON, "failed")
|
|
batch++
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
if err := store.MarkImported(ctx, item.ID); err != nil {
|
|
return err
|
|
}
|
|
if !item.IsFolder {
|
|
if err := store.MarkPath(ctx, item.ID, relPath); err != nil {
|
|
return err
|
|
}
|
|
if err := d.markSharedImported(ctx, current.DriveID, item.ID, relPath, job.ID, current.Shared); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
imported++
|
|
batch++
|
|
}
|
|
|
|
for _, sub := range subfolders {
|
|
relPath := path.Join(current.Path, sanitizeDrivePath(sub.Name))
|
|
queue = enqueueDriveFolder(queue, driveFolderRef{
|
|
ID: sub.ID, Path: relPath, DriveID: current.DriveID, Shared: current.Shared,
|
|
})
|
|
}
|
|
writeDriveFolderQueue(job.CursorJSON, queue)
|
|
|
|
job.StatsJSON["imported"] = imported
|
|
job.StatsJSON["skipped"] = skipped
|
|
job.StatsJSON["exported"] = exported
|
|
|
|
if listIndex+batch < len(folderItems) {
|
|
job.CursorJSON["listIndex"] = float64(listIndex + batch)
|
|
return update("pending", job.CursorJSON, job.StatsJSON, "")
|
|
}
|
|
delete(job.CursorJSON, "listIndex")
|
|
|
|
if nextCursor != "" {
|
|
if provider == "google" {
|
|
job.CursorJSON["pageToken"] = nextCursor
|
|
} else {
|
|
job.CursorJSON["nextLink"] = nextCursor
|
|
}
|
|
return update("pending", job.CursorJSON, job.StatsJSON, "")
|
|
}
|
|
delete(job.CursorJSON, "pageToken")
|
|
delete(job.CursorJSON, "nextLink")
|
|
|
|
job.CursorJSON["folderIndex"] = float64(folderIndex + 1)
|
|
delete(job.CursorJSON, "listIndex")
|
|
return update("pending", job.CursorJSON, job.StatsJSON, "")
|
|
}
|
|
|
|
type driveItem struct {
|
|
ID string
|
|
Name string
|
|
ParentID string
|
|
IsFolder bool
|
|
Size int64
|
|
MimeType string
|
|
Download string
|
|
Export bool
|
|
ExportMime string
|
|
ExportExt string
|
|
DriveID string
|
|
}
|
|
|
|
type driveSubfolder struct {
|
|
ID string
|
|
Name string
|
|
}
|
|
|
|
func (d *DriveImporter) listDriveFolderItems(ctx context.Context, accessToken, provider string, folder driveFolderRef, cursor map[string]any) ([]driveItem, string, []driveSubfolder, error) {
|
|
switch provider {
|
|
case "google":
|
|
pageToken, _ := cursor["pageToken"].(string)
|
|
q := url.QueryEscape("'" + folder.ID + "' in parents and trashed=false")
|
|
listURL := "https://www.googleapis.com/drive/v3/files?pageSize=100&fields=nextPageToken,files(id,name,mimeType,size)&q=" + q
|
|
listURL += googleDriveListParams(folder)
|
|
if pageToken != "" {
|
|
listURL += "&pageToken=" + url.QueryEscape(pageToken)
|
|
}
|
|
body, err := apiGet(ctx, d.client, listURL, accessToken)
|
|
if err != nil {
|
|
return nil, "", nil, err
|
|
}
|
|
var parsed struct {
|
|
Files []struct {
|
|
ID string `json:"id"`
|
|
Name string `json:"name"`
|
|
MimeType string `json:"mimeType"`
|
|
Size string `json:"size"`
|
|
} `json:"files"`
|
|
NextPageToken string `json:"nextPageToken"`
|
|
}
|
|
if err := json.Unmarshal(body, &parsed); err != nil {
|
|
return nil, "", nil, err
|
|
}
|
|
out := make([]driveItem, 0, len(parsed.Files))
|
|
for _, f := range parsed.Files {
|
|
size := int64(0)
|
|
if f.Size != "" {
|
|
fmt.Sscan(f.Size, &size)
|
|
}
|
|
item := driveItem{
|
|
ID: f.ID,
|
|
Name: f.Name,
|
|
IsFolder: f.MimeType == "application/vnd.google-apps.folder",
|
|
Size: size,
|
|
MimeType: f.MimeType,
|
|
}
|
|
if item.IsFolder {
|
|
out = append(out, item)
|
|
continue
|
|
}
|
|
if exportMime, ext, ok := googleWorkspaceExport(f.MimeType); ok {
|
|
item.Export = true
|
|
item.ExportMime = exportMime
|
|
item.ExportExt = ext
|
|
item.Name = driveExportFileName(f.Name, ext)
|
|
} else {
|
|
item.Download = googleDriveDownloadURL(f.ID, folder.Shared)
|
|
}
|
|
out = append(out, item)
|
|
}
|
|
return out, parsed.NextPageToken, nil, nil
|
|
default:
|
|
nextLink, _ := cursor["nextLink"].(string)
|
|
var listURL string
|
|
if folder.ID == "root" {
|
|
listURL = graphMicrosoftURL(d.userUPN, "/drive/root/children?$top=100&$select=id,name,folder,file,size")
|
|
} else {
|
|
listURL = graphMicrosoftURL(d.userUPN, "/drive/items/"+url.PathEscape(folder.ID)+"/children?$top=100&$select=id,name,folder,file,size")
|
|
}
|
|
if nextLink != "" {
|
|
listURL = nextLink
|
|
}
|
|
body, err := apiGet(ctx, d.client, listURL, accessToken)
|
|
if err != nil {
|
|
return nil, "", nil, err
|
|
}
|
|
var parsed struct {
|
|
Value []struct {
|
|
ID string `json:"id"`
|
|
Name string `json:"name"`
|
|
Folder *struct {
|
|
ChildCount int `json:"childCount"`
|
|
} `json:"folder"`
|
|
File *struct {
|
|
MimeType string `json:"mimeType"`
|
|
} `json:"file"`
|
|
Size int64 `json:"size"`
|
|
} `json:"value"`
|
|
NextLink string `json:"@odata.nextLink"`
|
|
}
|
|
if err := json.Unmarshal(body, &parsed); err != nil {
|
|
return nil, "", nil, err
|
|
}
|
|
out := make([]driveItem, 0, len(parsed.Value))
|
|
var subs []driveSubfolder
|
|
for _, f := range parsed.Value {
|
|
if f.Folder != nil {
|
|
out = append(out, driveItem{ID: f.ID, Name: f.Name, IsFolder: true})
|
|
if f.Folder.ChildCount > 0 {
|
|
subs = append(subs, driveSubfolder{ID: f.ID, Name: f.Name})
|
|
}
|
|
continue
|
|
}
|
|
mime := ""
|
|
if f.File != nil {
|
|
mime = f.File.MimeType
|
|
}
|
|
out = append(out, driveItem{
|
|
ID: f.ID,
|
|
Name: f.Name,
|
|
Size: f.Size,
|
|
MimeType: mime,
|
|
Download: graphMicrosoftURL(d.userUPN, "/drive/items/"+url.PathEscape(f.ID)+"/content"),
|
|
})
|
|
}
|
|
return out, parsed.NextLink, subs, nil
|
|
}
|
|
}
|
|
|
|
func (d *DriveImporter) downloadDriveFile(ctx context.Context, accessToken string, item driveItem) (io.ReadCloser, string, error) {
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodGet, item.Download, nil)
|
|
if err != nil {
|
|
return nil, "", err
|
|
}
|
|
req.Header.Set("Authorization", "Bearer "+accessToken)
|
|
resp, err := migrationDo(ctx, d.client, req)
|
|
if err != nil {
|
|
return nil, "", fmt.Errorf("download %s: %w", item.Name, err)
|
|
}
|
|
contentType := resp.Header.Get("Content-Type")
|
|
if contentType == "" {
|
|
contentType = item.MimeType
|
|
}
|
|
if contentType == "" {
|
|
contentType = "application/octet-stream"
|
|
}
|
|
return resp.Body, contentType, nil
|
|
}
|
|
|
|
func (d *DriveImporter) downloadGoogleExport(ctx context.Context, accessToken string, item driveItem) (io.ReadCloser, string, string, error) {
|
|
exportURL := fmt.Sprintf(
|
|
"https://www.googleapis.com/drive/v3/files/%s/export?mimeType=%s",
|
|
url.PathEscape(item.ID),
|
|
url.QueryEscape(item.ExportMime),
|
|
)
|
|
if item.DriveID != "" {
|
|
exportURL += "&supportsAllDrives=true"
|
|
}
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodGet, exportURL, nil)
|
|
if err != nil {
|
|
return nil, "", "", err
|
|
}
|
|
req.Header.Set("Authorization", "Bearer "+accessToken)
|
|
resp, err := migrationDo(ctx, d.client, req)
|
|
if err != nil {
|
|
return nil, "", "", fmt.Errorf("export %s: %w", item.Name, err)
|
|
}
|
|
contentType := resp.Header.Get("Content-Type")
|
|
if contentType == "" {
|
|
contentType = item.ExportMime
|
|
}
|
|
return resp.Body, contentType, driveExportFileName(item.Name, item.ExportExt), nil
|
|
}
|
|
|
|
func sanitizeDrivePath(name string) string {
|
|
name = strings.TrimSpace(name)
|
|
name = strings.ReplaceAll(name, "/", "-")
|
|
name = strings.ReplaceAll(name, "\\", "-")
|
|
if name == "" {
|
|
return "untitled"
|
|
}
|
|
return name
|
|
}
|
|
|
|
func relPathForItem(folder driveFolderRef, item driveItem) string {
|
|
return path.Join(folder.Path, sanitizeDrivePath(item.Name))
|
|
}
|
|
|
|
func jsonBool(v any) bool {
|
|
switch t := v.(type) {
|
|
case bool:
|
|
return t
|
|
case float64:
|
|
return t != 0
|
|
case string:
|
|
return t == "true" || t == "1"
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
func googleDriveListParams(folder driveFolderRef) string {
|
|
if folder.Shared && folder.DriveID != "" {
|
|
return "&corpora=drive&driveId=" + url.QueryEscape(folder.DriveID) +
|
|
"&includeItemsFromAllDrives=true&supportsAllDrives=true"
|
|
}
|
|
return "&supportsAllDrives=true"
|
|
}
|
|
|
|
func googleDriveDownloadURL(fileID string, shared bool) string {
|
|
u := "https://www.googleapis.com/drive/v3/files/" + url.PathEscape(fileID) + "?alt=media"
|
|
if shared {
|
|
u += "&supportsAllDrives=true"
|
|
}
|
|
return u
|
|
}
|
|
|
|
func (d *DriveImporter) uploadToNextcloud(ctx context.Context, ncUserID, targetPath string, content io.ReadCloser, contentType string, size int64) error {
|
|
defer content.Close()
|
|
if size > maxDriveFileBytes {
|
|
return d.nc.UploadStreaming(ctx, ncUserID, targetPath, content, contentType, size)
|
|
}
|
|
return d.nc.Upload(ctx, ncUserID, targetPath, content, contentType)
|
|
}
|
|
|
|
func (d *DriveImporter) bootstrapSharedDrives(ctx context.Context, job *Job, accessToken string) error {
|
|
pageToken := ""
|
|
for {
|
|
listURL := "https://www.googleapis.com/drive/v3/drives?pageSize=100&fields=nextPageToken,drives(id,name)"
|
|
if pageToken != "" {
|
|
listURL += "&pageToken=" + url.QueryEscape(pageToken)
|
|
}
|
|
body, err := apiGet(ctx, d.client, listURL, accessToken)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
var parsed struct {
|
|
Drives []struct {
|
|
ID string `json:"id"`
|
|
Name string `json:"name"`
|
|
} `json:"drives"`
|
|
NextPageToken string `json:"nextPageToken"`
|
|
}
|
|
if err := json.Unmarshal(body, &parsed); err != nil {
|
|
return err
|
|
}
|
|
for _, drive := range parsed.Drives {
|
|
if err := d.upsertDiscoveredSharedDrive(ctx, job.ProjectID, job.UserID, drive.ID, drive.Name, d.sharedDriveMode); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
if parsed.NextPageToken == "" {
|
|
break
|
|
}
|
|
pageToken = parsed.NextPageToken
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (d *DriveImporter) mergeSharedDriveFolders(ctx context.Context, job *Job, provider string) error {
|
|
if provider != "google" {
|
|
return nil
|
|
}
|
|
queue := readDriveFolderQueue(job.CursorJSON, provider)
|
|
sharedFolders, err := d.loadApprovedSharedDriveFolders(ctx, job.ProjectID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, folder := range sharedFolders {
|
|
queue = enqueueDriveFolder(queue, folder)
|
|
}
|
|
writeDriveFolderQueue(job.CursorJSON, queue)
|
|
return nil
|
|
}
|
|
|