- Added endpoints for listing and importing migration rosters. - Introduced audit export functionality for migration jobs in CSV and NDJSON formats. - Implemented tenant mismatch validation for Microsoft migration claims. - Enhanced error handling for email claiming and migration processes. - Added integration tests for roster import and claim workflows.
320 lines
7.9 KiB
Go
320 lines
7.9 KiB
Go
package migration
|
|
|
|
import (
|
|
"context"
|
|
"encoding/csv"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"time"
|
|
)
|
|
|
|
const jobAuditExportFlushEvery = 100
|
|
|
|
var jobAuditCSVHeaders = []string{"item_id", "rel_path", "status", "error", "service", "timestamp"}
|
|
|
|
var projectAuditCSVHeaders = []string{"job_id", "item_id", "rel_path", "status", "error", "service", "timestamp"}
|
|
|
|
// JobAuditExportMeta describes a migration job audit export download.
|
|
type JobAuditExportMeta struct {
|
|
ContentType string
|
|
FileName string
|
|
}
|
|
|
|
// JobAuditExportRow is one exported audit line.
|
|
type JobAuditExportRow struct {
|
|
JobID string `json:"job_id,omitempty"`
|
|
ItemID string `json:"item_id"`
|
|
RelPath string `json:"rel_path,omitempty"`
|
|
Status string `json:"status"`
|
|
Error string `json:"error,omitempty"`
|
|
Service string `json:"service"`
|
|
Timestamp string `json:"timestamp"`
|
|
}
|
|
|
|
// PrepareJobAuditExport validates the job belongs to the project and returns download metadata.
|
|
func (s *Service) PrepareJobAuditExport(ctx context.Context, projectID, jobID, format string) (JobAuditExportMeta, error) {
|
|
if _, err := s.verifyJobInProject(ctx, projectID, jobID); err != nil {
|
|
return JobAuditExportMeta{}, err
|
|
}
|
|
return jobAuditExportMeta(format, jobID, false), nil
|
|
}
|
|
|
|
// PrepareProjectAuditExport validates the project and returns download metadata.
|
|
func (s *Service) PrepareProjectAuditExport(ctx context.Context, projectID, format string) (JobAuditExportMeta, error) {
|
|
if err := s.verifyProjectExists(ctx, projectID); err != nil {
|
|
return JobAuditExportMeta{}, err
|
|
}
|
|
return jobAuditExportMeta(format, projectID, true), nil
|
|
}
|
|
|
|
// WriteJobAuditExport streams audit rows for one job to w. Call PrepareJobAuditExport first.
|
|
func (s *Service) WriteJobAuditExport(
|
|
ctx context.Context,
|
|
projectID, jobID, statusFilter, format string,
|
|
w io.Writer,
|
|
) error {
|
|
service, err := s.verifyJobInProject(ctx, projectID, jobID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
statusFilter = normalizeAuditStatusFilter(statusFilter)
|
|
|
|
listSQL := `
|
|
SELECT source_id, rel_path, status, reason, imported_at::text
|
|
FROM migration_imported_items
|
|
WHERE job_id = $1::uuid
|
|
`
|
|
listArgs := []any{jobID}
|
|
if statusFilter != "" {
|
|
listSQL += ` AND status = $2`
|
|
listArgs = append(listArgs, statusFilter)
|
|
}
|
|
listSQL += ` ORDER BY imported_at DESC, source_id ASC`
|
|
|
|
rows, err := s.db.Query(ctx, listSQL, listArgs...)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer rows.Close()
|
|
|
|
if err := streamJobAuditRows(format, w, service, jobID, rows); err != nil {
|
|
return err
|
|
}
|
|
return rows.Err()
|
|
}
|
|
|
|
// WriteProjectAuditExport streams audit rows for all jobs in a project to w.
|
|
func (s *Service) WriteProjectAuditExport(
|
|
ctx context.Context,
|
|
projectID, statusFilter, format string,
|
|
w io.Writer,
|
|
) error {
|
|
if err := s.verifyProjectExists(ctx, projectID); err != nil {
|
|
return err
|
|
}
|
|
|
|
statusFilter = normalizeAuditStatusFilter(statusFilter)
|
|
|
|
listSQL := `
|
|
SELECT j.id::text, j.service, i.source_id, i.rel_path, i.status, i.reason, i.imported_at::text
|
|
FROM migration_imported_items i
|
|
JOIN migration_jobs j ON j.id = i.job_id
|
|
WHERE j.project_id = $1::uuid
|
|
`
|
|
listArgs := []any{projectID}
|
|
if statusFilter != "" {
|
|
listSQL += ` AND i.status = $2`
|
|
listArgs = append(listArgs, statusFilter)
|
|
}
|
|
listSQL += ` ORDER BY i.imported_at DESC, i.source_id ASC`
|
|
|
|
rows, err := s.db.Query(ctx, listSQL, listArgs...)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer rows.Close()
|
|
|
|
if err := streamProjectAuditRows(format, w, rows); err != nil {
|
|
return err
|
|
}
|
|
return rows.Err()
|
|
}
|
|
|
|
func (s *Service) verifyProjectExists(ctx context.Context, projectID string) error {
|
|
var exists bool
|
|
err := s.db.QueryRow(ctx, `SELECT EXISTS(SELECT 1 FROM migration_projects WHERE id = $1::uuid)`, projectID).Scan(&exists)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !exists {
|
|
return fmt.Errorf("project not found")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func jobAuditExportMeta(format, id string, projectLevel bool) JobAuditExportMeta {
|
|
now := time.Now().UTC().Format("20060102T150405Z")
|
|
shortID := id
|
|
if len(shortID) > 8 {
|
|
shortID = shortID[:8]
|
|
}
|
|
prefix := "migration-job-audit"
|
|
if projectLevel {
|
|
prefix = "migration-project-audit"
|
|
}
|
|
ext := "ndjson"
|
|
contentType := "application/x-ndjson; charset=utf-8"
|
|
if format == "csv" {
|
|
ext = "csv"
|
|
contentType = "text/csv; charset=utf-8"
|
|
}
|
|
return JobAuditExportMeta{
|
|
ContentType: contentType,
|
|
FileName: fmt.Sprintf("%s-%s-%s.%s", prefix, shortID, now, ext),
|
|
}
|
|
}
|
|
|
|
type jobAuditRowScanner interface {
|
|
Next() bool
|
|
Scan(dest ...any) error
|
|
Err() error
|
|
}
|
|
|
|
func streamJobAuditRows(format string, w io.Writer, service, jobID string, rows jobAuditRowScanner) error {
|
|
flusher, _ := w.(http.Flusher)
|
|
switch format {
|
|
case "csv":
|
|
cw := csv.NewWriter(w)
|
|
if err := cw.Write(jobAuditCSVHeaders); err != nil {
|
|
return err
|
|
}
|
|
count := 0
|
|
for rows.Next() {
|
|
var itemID, relPath, status, reason, importedAt string
|
|
if err := rows.Scan(&itemID, &relPath, &status, &reason, &importedAt); err != nil {
|
|
return err
|
|
}
|
|
if err := writeJobAuditCSVRow(cw, JobAuditExportRow{
|
|
JobID: jobID,
|
|
ItemID: itemID,
|
|
RelPath: relPath,
|
|
Status: status,
|
|
Error: reason,
|
|
Service: service,
|
|
Timestamp: importedAt,
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
count++
|
|
if count%jobAuditExportFlushEvery == 0 {
|
|
cw.Flush()
|
|
if err := cw.Error(); err != nil {
|
|
return err
|
|
}
|
|
if flusher != nil {
|
|
flusher.Flush()
|
|
}
|
|
}
|
|
}
|
|
cw.Flush()
|
|
return cw.Error()
|
|
default:
|
|
enc := json.NewEncoder(w)
|
|
count := 0
|
|
for rows.Next() {
|
|
var itemID, relPath, status, reason, importedAt string
|
|
if err := rows.Scan(&itemID, &relPath, &status, &reason, &importedAt); err != nil {
|
|
return err
|
|
}
|
|
if err := enc.Encode(JobAuditExportRow{
|
|
JobID: jobID,
|
|
ItemID: itemID,
|
|
RelPath: relPath,
|
|
Status: status,
|
|
Error: reason,
|
|
Service: service,
|
|
Timestamp: importedAt,
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
count++
|
|
if count%jobAuditExportFlushEvery == 0 && flusher != nil {
|
|
flusher.Flush()
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func streamProjectAuditRows(format string, w io.Writer, rows jobAuditRowScanner) error {
|
|
flusher, _ := w.(http.Flusher)
|
|
switch format {
|
|
case "csv":
|
|
cw := csv.NewWriter(w)
|
|
if err := cw.Write(projectAuditCSVHeaders); err != nil {
|
|
return err
|
|
}
|
|
count := 0
|
|
for rows.Next() {
|
|
var jobID, service, itemID, relPath, status, reason, importedAt string
|
|
if err := rows.Scan(&jobID, &service, &itemID, &relPath, &status, &reason, &importedAt); err != nil {
|
|
return err
|
|
}
|
|
if err := writeProjectAuditCSVRow(cw, JobAuditExportRow{
|
|
JobID: jobID,
|
|
ItemID: itemID,
|
|
RelPath: relPath,
|
|
Status: status,
|
|
Error: reason,
|
|
Service: service,
|
|
Timestamp: importedAt,
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
count++
|
|
if count%jobAuditExportFlushEvery == 0 {
|
|
cw.Flush()
|
|
if err := cw.Error(); err != nil {
|
|
return err
|
|
}
|
|
if flusher != nil {
|
|
flusher.Flush()
|
|
}
|
|
}
|
|
}
|
|
cw.Flush()
|
|
return cw.Error()
|
|
default:
|
|
enc := json.NewEncoder(w)
|
|
count := 0
|
|
for rows.Next() {
|
|
var jobID, service, itemID, relPath, status, reason, importedAt string
|
|
if err := rows.Scan(&jobID, &service, &itemID, &relPath, &status, &reason, &importedAt); err != nil {
|
|
return err
|
|
}
|
|
if err := enc.Encode(JobAuditExportRow{
|
|
JobID: jobID,
|
|
ItemID: itemID,
|
|
RelPath: relPath,
|
|
Status: status,
|
|
Error: reason,
|
|
Service: service,
|
|
Timestamp: importedAt,
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
count++
|
|
if count%jobAuditExportFlushEvery == 0 && flusher != nil {
|
|
flusher.Flush()
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func writeJobAuditCSVRow(w *csv.Writer, row JobAuditExportRow) error {
|
|
return w.Write([]string{
|
|
row.ItemID,
|
|
row.RelPath,
|
|
row.Status,
|
|
row.Error,
|
|
row.Service,
|
|
row.Timestamp,
|
|
})
|
|
}
|
|
|
|
func writeProjectAuditCSVRow(w *csv.Writer, row JobAuditExportRow) error {
|
|
return w.Write([]string{
|
|
row.JobID,
|
|
row.ItemID,
|
|
row.RelPath,
|
|
row.Status,
|
|
row.Error,
|
|
row.Service,
|
|
row.Timestamp,
|
|
})
|
|
}
|