ultisuite-backend/internal/migration/job_audit_export.go
R3D347HR4Y 1ffd0817d8
Some checks are pending
CI / Go tests (push) Waiting to run
CI / Integration tests (push) Waiting to run
CI / DB migrations (push) Waiting to run
feat(migration): enhance migration API with roster and audit export features
- Added endpoints for listing and importing migration rosters.
- Introduced audit export functionality for migration jobs in CSV and NDJSON formats.
- Implemented tenant mismatch validation for Microsoft migration claims.
- Enhanced error handling for email claiming and migration processes.
- Added integration tests for roster import and claim workflows.
2026-06-13 13:11:30 +02:00

320 lines
7.9 KiB
Go

package migration
import (
"context"
"encoding/csv"
"encoding/json"
"fmt"
"io"
"net/http"
"time"
)
const jobAuditExportFlushEvery = 100
var jobAuditCSVHeaders = []string{"item_id", "rel_path", "status", "error", "service", "timestamp"}
var projectAuditCSVHeaders = []string{"job_id", "item_id", "rel_path", "status", "error", "service", "timestamp"}
// JobAuditExportMeta describes a migration job audit export download.
type JobAuditExportMeta struct {
ContentType string
FileName string
}
// JobAuditExportRow is one exported audit line.
type JobAuditExportRow struct {
JobID string `json:"job_id,omitempty"`
ItemID string `json:"item_id"`
RelPath string `json:"rel_path,omitempty"`
Status string `json:"status"`
Error string `json:"error,omitempty"`
Service string `json:"service"`
Timestamp string `json:"timestamp"`
}
// PrepareJobAuditExport validates the job belongs to the project and returns download metadata.
func (s *Service) PrepareJobAuditExport(ctx context.Context, projectID, jobID, format string) (JobAuditExportMeta, error) {
if _, err := s.verifyJobInProject(ctx, projectID, jobID); err != nil {
return JobAuditExportMeta{}, err
}
return jobAuditExportMeta(format, jobID, false), nil
}
// PrepareProjectAuditExport validates the project and returns download metadata.
func (s *Service) PrepareProjectAuditExport(ctx context.Context, projectID, format string) (JobAuditExportMeta, error) {
if err := s.verifyProjectExists(ctx, projectID); err != nil {
return JobAuditExportMeta{}, err
}
return jobAuditExportMeta(format, projectID, true), nil
}
// WriteJobAuditExport streams audit rows for one job to w. Call PrepareJobAuditExport first.
func (s *Service) WriteJobAuditExport(
ctx context.Context,
projectID, jobID, statusFilter, format string,
w io.Writer,
) error {
service, err := s.verifyJobInProject(ctx, projectID, jobID)
if err != nil {
return err
}
statusFilter = normalizeAuditStatusFilter(statusFilter)
listSQL := `
SELECT source_id, rel_path, status, reason, imported_at::text
FROM migration_imported_items
WHERE job_id = $1::uuid
`
listArgs := []any{jobID}
if statusFilter != "" {
listSQL += ` AND status = $2`
listArgs = append(listArgs, statusFilter)
}
listSQL += ` ORDER BY imported_at DESC, source_id ASC`
rows, err := s.db.Query(ctx, listSQL, listArgs...)
if err != nil {
return err
}
defer rows.Close()
if err := streamJobAuditRows(format, w, service, jobID, rows); err != nil {
return err
}
return rows.Err()
}
// WriteProjectAuditExport streams audit rows for all jobs in a project to w.
func (s *Service) WriteProjectAuditExport(
ctx context.Context,
projectID, statusFilter, format string,
w io.Writer,
) error {
if err := s.verifyProjectExists(ctx, projectID); err != nil {
return err
}
statusFilter = normalizeAuditStatusFilter(statusFilter)
listSQL := `
SELECT j.id::text, j.service, i.source_id, i.rel_path, i.status, i.reason, i.imported_at::text
FROM migration_imported_items i
JOIN migration_jobs j ON j.id = i.job_id
WHERE j.project_id = $1::uuid
`
listArgs := []any{projectID}
if statusFilter != "" {
listSQL += ` AND i.status = $2`
listArgs = append(listArgs, statusFilter)
}
listSQL += ` ORDER BY i.imported_at DESC, i.source_id ASC`
rows, err := s.db.Query(ctx, listSQL, listArgs...)
if err != nil {
return err
}
defer rows.Close()
if err := streamProjectAuditRows(format, w, rows); err != nil {
return err
}
return rows.Err()
}
func (s *Service) verifyProjectExists(ctx context.Context, projectID string) error {
var exists bool
err := s.db.QueryRow(ctx, `SELECT EXISTS(SELECT 1 FROM migration_projects WHERE id = $1::uuid)`, projectID).Scan(&exists)
if err != nil {
return err
}
if !exists {
return fmt.Errorf("project not found")
}
return nil
}
func jobAuditExportMeta(format, id string, projectLevel bool) JobAuditExportMeta {
now := time.Now().UTC().Format("20060102T150405Z")
shortID := id
if len(shortID) > 8 {
shortID = shortID[:8]
}
prefix := "migration-job-audit"
if projectLevel {
prefix = "migration-project-audit"
}
ext := "ndjson"
contentType := "application/x-ndjson; charset=utf-8"
if format == "csv" {
ext = "csv"
contentType = "text/csv; charset=utf-8"
}
return JobAuditExportMeta{
ContentType: contentType,
FileName: fmt.Sprintf("%s-%s-%s.%s", prefix, shortID, now, ext),
}
}
type jobAuditRowScanner interface {
Next() bool
Scan(dest ...any) error
Err() error
}
func streamJobAuditRows(format string, w io.Writer, service, jobID string, rows jobAuditRowScanner) error {
flusher, _ := w.(http.Flusher)
switch format {
case "csv":
cw := csv.NewWriter(w)
if err := cw.Write(jobAuditCSVHeaders); err != nil {
return err
}
count := 0
for rows.Next() {
var itemID, relPath, status, reason, importedAt string
if err := rows.Scan(&itemID, &relPath, &status, &reason, &importedAt); err != nil {
return err
}
if err := writeJobAuditCSVRow(cw, JobAuditExportRow{
JobID: jobID,
ItemID: itemID,
RelPath: relPath,
Status: status,
Error: reason,
Service: service,
Timestamp: importedAt,
}); err != nil {
return err
}
count++
if count%jobAuditExportFlushEvery == 0 {
cw.Flush()
if err := cw.Error(); err != nil {
return err
}
if flusher != nil {
flusher.Flush()
}
}
}
cw.Flush()
return cw.Error()
default:
enc := json.NewEncoder(w)
count := 0
for rows.Next() {
var itemID, relPath, status, reason, importedAt string
if err := rows.Scan(&itemID, &relPath, &status, &reason, &importedAt); err != nil {
return err
}
if err := enc.Encode(JobAuditExportRow{
JobID: jobID,
ItemID: itemID,
RelPath: relPath,
Status: status,
Error: reason,
Service: service,
Timestamp: importedAt,
}); err != nil {
return err
}
count++
if count%jobAuditExportFlushEvery == 0 && flusher != nil {
flusher.Flush()
}
}
return nil
}
}
func streamProjectAuditRows(format string, w io.Writer, rows jobAuditRowScanner) error {
flusher, _ := w.(http.Flusher)
switch format {
case "csv":
cw := csv.NewWriter(w)
if err := cw.Write(projectAuditCSVHeaders); err != nil {
return err
}
count := 0
for rows.Next() {
var jobID, service, itemID, relPath, status, reason, importedAt string
if err := rows.Scan(&jobID, &service, &itemID, &relPath, &status, &reason, &importedAt); err != nil {
return err
}
if err := writeProjectAuditCSVRow(cw, JobAuditExportRow{
JobID: jobID,
ItemID: itemID,
RelPath: relPath,
Status: status,
Error: reason,
Service: service,
Timestamp: importedAt,
}); err != nil {
return err
}
count++
if count%jobAuditExportFlushEvery == 0 {
cw.Flush()
if err := cw.Error(); err != nil {
return err
}
if flusher != nil {
flusher.Flush()
}
}
}
cw.Flush()
return cw.Error()
default:
enc := json.NewEncoder(w)
count := 0
for rows.Next() {
var jobID, service, itemID, relPath, status, reason, importedAt string
if err := rows.Scan(&jobID, &service, &itemID, &relPath, &status, &reason, &importedAt); err != nil {
return err
}
if err := enc.Encode(JobAuditExportRow{
JobID: jobID,
ItemID: itemID,
RelPath: relPath,
Status: status,
Error: reason,
Service: service,
Timestamp: importedAt,
}); err != nil {
return err
}
count++
if count%jobAuditExportFlushEvery == 0 && flusher != nil {
flusher.Flush()
}
}
return nil
}
}
func writeJobAuditCSVRow(w *csv.Writer, row JobAuditExportRow) error {
return w.Write([]string{
row.ItemID,
row.RelPath,
row.Status,
row.Error,
row.Service,
row.Timestamp,
})
}
func writeProjectAuditCSVRow(w *csv.Writer, row JobAuditExportRow) error {
return w.Write([]string{
row.JobID,
row.ItemID,
row.RelPath,
row.Status,
row.Error,
row.Service,
row.Timestamp,
})
}