ultisuite-backend/internal/migration/drive_shared.go
R3D347HR4Y 1ffd0817d8
Some checks are pending
CI / Go tests (push) Waiting to run
CI / Integration tests (push) Waiting to run
CI / DB migrations (push) Waiting to run
feat(migration): enhance migration API with roster and audit export features
- Added endpoints for listing and importing migration rosters.
- Introduced audit export functionality for migration jobs in CSV and NDJSON formats.
- Implemented tenant mismatch validation for Microsoft migration claims.
- Enhanced error handling for email claiming and migration processes.
- Added integration tests for roster import and claim workflows.
2026-06-13 13:11:30 +02:00

238 lines
6.9 KiB
Go

package migration
import (
"context"
"fmt"
"github.com/jackc/pgx/v5/pgxpool"
)
const (
SharedDriveModeAuto = "auto"
SharedDriveModeManual = "manual"
SharedDriveStatusPending = "pending"
SharedDriveStatusApproved = "approved"
SharedDriveStatusRejected = "rejected"
)
type SharedDrive struct {
ID string `json:"id"`
ProjectID string `json:"project_id"`
DriveID string `json:"drive_id"`
Name string `json:"name"`
Status string `json:"status"`
DiscoveredByUserID *string `json:"discovered_by_user_id,omitempty"`
CreatedAt string `json:"created_at"`
UpdatedAt string `json:"updated_at"`
}
func NormalizeSharedDriveMode(mode string) string {
switch mode {
case SharedDriveModeManual:
return SharedDriveModeManual
default:
return SharedDriveModeAuto
}
}
// SharedDriveItemStore tracks project-level imports for shared drive files (cross-user dedup).
type SharedDriveItemStore struct {
db *pgxpool.Pool
projectID string
done map[string]struct{} // key: driveID + ":" + sourceID
}
func NewSharedDriveItemStoreMemory() *SharedDriveItemStore {
return &SharedDriveItemStore{done: map[string]struct{}{}}
}
func LoadSharedDriveItemStore(ctx context.Context, db *pgxpool.Pool, projectID string) (*SharedDriveItemStore, error) {
store := &SharedDriveItemStore{
db: db,
projectID: projectID,
done: map[string]struct{}{},
}
if db == nil || projectID == "" {
return store, nil
}
rows, err := db.Query(ctx, `
SELECT drive_id, source_id
FROM migration_shared_drive_items
WHERE project_id = $1::uuid
`, projectID)
if err != nil {
return nil, err
}
defer rows.Close()
for rows.Next() {
var driveID, sourceID string
if err := rows.Scan(&driveID, &sourceID); err != nil {
return nil, err
}
store.done[sharedDriveItemKey(driveID, sourceID)] = struct{}{}
}
return store, rows.Err()
}
func sharedDriveItemKey(driveID, sourceID string) string {
return driveID + ":" + sourceID
}
func (s *SharedDriveItemStore) Has(driveID, sourceID string) bool {
if s == nil || driveID == "" || sourceID == "" {
return false
}
_, ok := s.done[sharedDriveItemKey(driveID, sourceID)]
return ok
}
func (s *SharedDriveItemStore) MarkImported(ctx context.Context, driveID, sourceID, relPath, jobID string) error {
if driveID == "" || sourceID == "" {
return nil
}
s.done[sharedDriveItemKey(driveID, sourceID)] = struct{}{}
if s.db == nil || s.projectID == "" {
return nil
}
_, err := s.db.Exec(ctx, `
INSERT INTO migration_shared_drive_items (project_id, drive_id, source_id, rel_path, imported_by_job_id)
VALUES ($1::uuid, $2, $3, $4, NULLIF($5, '')::uuid)
ON CONFLICT (project_id, drive_id, source_id) DO NOTHING
`, s.projectID, driveID, sourceID, relPath, jobID)
return err
}
func (s *Service) UpdateSharedDriveMode(ctx context.Context, projectID, mode string) (Project, error) {
mode = NormalizeSharedDriveMode(mode)
sc := newProjectScanner()
err := s.db.QueryRow(ctx, `
UPDATE migration_projects
SET shared_drive_mode = $2, updated_at = NOW()
WHERE id = $1::uuid
RETURNING `+projectSelectSQL("")+`
`, projectID, mode).Scan(sc.targets()...)
return sc.result(), err
}
func (s *Service) ListSharedDrives(ctx context.Context, projectID, statusFilter string) ([]SharedDrive, error) {
query := `
SELECT id::text, project_id::text, drive_id, name, status,
NULLIF(discovered_by_user_id::text, ''), created_at::text, updated_at::text
FROM migration_shared_drives
WHERE project_id = $1::uuid
`
args := []any{projectID}
if statusFilter != "" {
query += ` AND status = $2`
args = append(args, statusFilter)
}
query += ` ORDER BY name ASC, created_at ASC`
rows, err := s.db.Query(ctx, query, args...)
if err != nil {
return nil, err
}
defer rows.Close()
var out []SharedDrive
for rows.Next() {
var row SharedDrive
if err := rows.Scan(
&row.ID, &row.ProjectID, &row.DriveID, &row.Name, &row.Status,
&row.DiscoveredByUserID, &row.CreatedAt, &row.UpdatedAt,
); err != nil {
return nil, err
}
out = append(out, row)
}
return out, rows.Err()
}
func (s *Service) SetSharedDriveStatus(ctx context.Context, projectID, driveID, status string) (SharedDrive, error) {
var row SharedDrive
err := s.db.QueryRow(ctx, `
UPDATE migration_shared_drives
SET status = $3, updated_at = NOW()
WHERE project_id = $1::uuid AND drive_id = $2
RETURNING id::text, project_id::text, drive_id, name, status,
NULLIF(discovered_by_user_id::text, ''), created_at::text, updated_at::text
`, projectID, driveID, status).Scan(
&row.ID, &row.ProjectID, &row.DriveID, &row.Name, &row.Status,
&row.DiscoveredByUserID, &row.CreatedAt, &row.UpdatedAt,
)
if err != nil {
return SharedDrive{}, fmt.Errorf("shared drive not found")
}
return row, nil
}
func (s *Service) ApproveSharedDrive(ctx context.Context, projectID, driveID string) (SharedDrive, error) {
return s.SetSharedDriveStatus(ctx, projectID, driveID, SharedDriveStatusApproved)
}
func (s *Service) RejectSharedDrive(ctx context.Context, projectID, driveID string) (SharedDrive, error) {
return s.SetSharedDriveStatus(ctx, projectID, driveID, SharedDriveStatusRejected)
}
func (d *DriveImporter) upsertDiscoveredSharedDrive(ctx context.Context, projectID, userID, driveID, name, mode string) error {
if d.db == nil {
return nil
}
autoApprove := NormalizeSharedDriveMode(mode) == SharedDriveModeAuto
initialStatus := SharedDriveStatusPending
if autoApprove {
initialStatus = SharedDriveStatusApproved
}
_, err := d.db.Exec(ctx, `
INSERT INTO migration_shared_drives (project_id, drive_id, name, status, discovered_by_user_id)
VALUES ($1::uuid, $2, $3, $4, NULLIF($5, '')::uuid)
ON CONFLICT (project_id, drive_id) DO UPDATE
SET name = COALESCE(NULLIF(EXCLUDED.name, ''), migration_shared_drives.name),
status = CASE
WHEN migration_shared_drives.status = 'rejected' THEN 'rejected'
WHEN migration_shared_drives.status = 'approved' THEN 'approved'
WHEN $6 = 'auto' THEN 'approved'
ELSE migration_shared_drives.status
END,
updated_at = NOW()
`, projectID, driveID, name, initialStatus, userID, NormalizeSharedDriveMode(mode))
return err
}
func (d *DriveImporter) loadApprovedSharedDriveFolders(ctx context.Context, projectID string) ([]driveFolderRef, error) {
if d.db == nil {
return nil, nil
}
rows, err := d.db.Query(ctx, `
SELECT drive_id, name
FROM migration_shared_drives
WHERE project_id = $1::uuid AND status = 'approved'
ORDER BY name ASC
`, projectID)
if err != nil {
return nil, err
}
defer rows.Close()
var out []driveFolderRef
for rows.Next() {
var id, name string
if err := rows.Scan(&id, &name); err != nil {
return nil, err
}
out = append(out, driveFolderRef{
ID: id,
Path: pathJoinSharedDrive(name),
DriveID: id,
Shared: true,
})
}
return out, rows.Err()
}
func pathJoinSharedDrive(name string) string {
return "Shared Drives/" + sanitizeDrivePath(name)
}