package migration import ( "context" "encoding/csv" "encoding/json" "fmt" "io" "net/http" "time" ) const jobAuditExportFlushEvery = 100 var jobAuditCSVHeaders = []string{"item_id", "rel_path", "status", "error", "service", "timestamp"} var projectAuditCSVHeaders = []string{"job_id", "item_id", "rel_path", "status", "error", "service", "timestamp"} // JobAuditExportMeta describes a migration job audit export download. type JobAuditExportMeta struct { ContentType string FileName string } // JobAuditExportRow is one exported audit line. type JobAuditExportRow struct { JobID string `json:"job_id,omitempty"` ItemID string `json:"item_id"` RelPath string `json:"rel_path,omitempty"` Status string `json:"status"` Error string `json:"error,omitempty"` Service string `json:"service"` Timestamp string `json:"timestamp"` } // PrepareJobAuditExport validates the job belongs to the project and returns download metadata. func (s *Service) PrepareJobAuditExport(ctx context.Context, projectID, jobID, format string) (JobAuditExportMeta, error) { if _, err := s.verifyJobInProject(ctx, projectID, jobID); err != nil { return JobAuditExportMeta{}, err } return jobAuditExportMeta(format, jobID, false), nil } // PrepareProjectAuditExport validates the project and returns download metadata. func (s *Service) PrepareProjectAuditExport(ctx context.Context, projectID, format string) (JobAuditExportMeta, error) { if err := s.verifyProjectExists(ctx, projectID); err != nil { return JobAuditExportMeta{}, err } return jobAuditExportMeta(format, projectID, true), nil } // WriteJobAuditExport streams audit rows for one job to w. Call PrepareJobAuditExport first. func (s *Service) WriteJobAuditExport( ctx context.Context, projectID, jobID, statusFilter, format string, w io.Writer, ) error { service, err := s.verifyJobInProject(ctx, projectID, jobID) if err != nil { return err } statusFilter = normalizeAuditStatusFilter(statusFilter) listSQL := ` SELECT source_id, rel_path, status, reason, imported_at::text FROM migration_imported_items WHERE job_id = $1::uuid ` listArgs := []any{jobID} if statusFilter != "" { listSQL += ` AND status = $2` listArgs = append(listArgs, statusFilter) } listSQL += ` ORDER BY imported_at DESC, source_id ASC` rows, err := s.db.Query(ctx, listSQL, listArgs...) if err != nil { return err } defer rows.Close() if err := streamJobAuditRows(format, w, service, jobID, rows); err != nil { return err } return rows.Err() } // WriteProjectAuditExport streams audit rows for all jobs in a project to w. func (s *Service) WriteProjectAuditExport( ctx context.Context, projectID, statusFilter, format string, w io.Writer, ) error { if err := s.verifyProjectExists(ctx, projectID); err != nil { return err } statusFilter = normalizeAuditStatusFilter(statusFilter) listSQL := ` SELECT j.id::text, j.service, i.source_id, i.rel_path, i.status, i.reason, i.imported_at::text FROM migration_imported_items i JOIN migration_jobs j ON j.id = i.job_id WHERE j.project_id = $1::uuid ` listArgs := []any{projectID} if statusFilter != "" { listSQL += ` AND i.status = $2` listArgs = append(listArgs, statusFilter) } listSQL += ` ORDER BY i.imported_at DESC, i.source_id ASC` rows, err := s.db.Query(ctx, listSQL, listArgs...) if err != nil { return err } defer rows.Close() if err := streamProjectAuditRows(format, w, rows); err != nil { return err } return rows.Err() } func (s *Service) verifyProjectExists(ctx context.Context, projectID string) error { var exists bool err := s.db.QueryRow(ctx, `SELECT EXISTS(SELECT 1 FROM migration_projects WHERE id = $1::uuid)`, projectID).Scan(&exists) if err != nil { return err } if !exists { return fmt.Errorf("project not found") } return nil } func jobAuditExportMeta(format, id string, projectLevel bool) JobAuditExportMeta { now := time.Now().UTC().Format("20060102T150405Z") shortID := id if len(shortID) > 8 { shortID = shortID[:8] } prefix := "migration-job-audit" if projectLevel { prefix = "migration-project-audit" } ext := "ndjson" contentType := "application/x-ndjson; charset=utf-8" if format == "csv" { ext = "csv" contentType = "text/csv; charset=utf-8" } return JobAuditExportMeta{ ContentType: contentType, FileName: fmt.Sprintf("%s-%s-%s.%s", prefix, shortID, now, ext), } } type jobAuditRowScanner interface { Next() bool Scan(dest ...any) error Err() error } func streamJobAuditRows(format string, w io.Writer, service, jobID string, rows jobAuditRowScanner) error { flusher, _ := w.(http.Flusher) switch format { case "csv": cw := csv.NewWriter(w) if err := cw.Write(jobAuditCSVHeaders); err != nil { return err } count := 0 for rows.Next() { var itemID, relPath, status, reason, importedAt string if err := rows.Scan(&itemID, &relPath, &status, &reason, &importedAt); err != nil { return err } if err := writeJobAuditCSVRow(cw, JobAuditExportRow{ JobID: jobID, ItemID: itemID, RelPath: relPath, Status: status, Error: reason, Service: service, Timestamp: importedAt, }); err != nil { return err } count++ if count%jobAuditExportFlushEvery == 0 { cw.Flush() if err := cw.Error(); err != nil { return err } if flusher != nil { flusher.Flush() } } } cw.Flush() return cw.Error() default: enc := json.NewEncoder(w) count := 0 for rows.Next() { var itemID, relPath, status, reason, importedAt string if err := rows.Scan(&itemID, &relPath, &status, &reason, &importedAt); err != nil { return err } if err := enc.Encode(JobAuditExportRow{ JobID: jobID, ItemID: itemID, RelPath: relPath, Status: status, Error: reason, Service: service, Timestamp: importedAt, }); err != nil { return err } count++ if count%jobAuditExportFlushEvery == 0 && flusher != nil { flusher.Flush() } } return nil } } func streamProjectAuditRows(format string, w io.Writer, rows jobAuditRowScanner) error { flusher, _ := w.(http.Flusher) switch format { case "csv": cw := csv.NewWriter(w) if err := cw.Write(projectAuditCSVHeaders); err != nil { return err } count := 0 for rows.Next() { var jobID, service, itemID, relPath, status, reason, importedAt string if err := rows.Scan(&jobID, &service, &itemID, &relPath, &status, &reason, &importedAt); err != nil { return err } if err := writeProjectAuditCSVRow(cw, JobAuditExportRow{ JobID: jobID, ItemID: itemID, RelPath: relPath, Status: status, Error: reason, Service: service, Timestamp: importedAt, }); err != nil { return err } count++ if count%jobAuditExportFlushEvery == 0 { cw.Flush() if err := cw.Error(); err != nil { return err } if flusher != nil { flusher.Flush() } } } cw.Flush() return cw.Error() default: enc := json.NewEncoder(w) count := 0 for rows.Next() { var jobID, service, itemID, relPath, status, reason, importedAt string if err := rows.Scan(&jobID, &service, &itemID, &relPath, &status, &reason, &importedAt); err != nil { return err } if err := enc.Encode(JobAuditExportRow{ JobID: jobID, ItemID: itemID, RelPath: relPath, Status: status, Error: reason, Service: service, Timestamp: importedAt, }); err != nil { return err } count++ if count%jobAuditExportFlushEvery == 0 && flusher != nil { flusher.Flush() } } return nil } } func writeJobAuditCSVRow(w *csv.Writer, row JobAuditExportRow) error { return w.Write([]string{ row.ItemID, row.RelPath, row.Status, row.Error, row.Service, row.Timestamp, }) } func writeProjectAuditCSVRow(w *csv.Writer, row JobAuditExportRow) error { return w.Write([]string{ row.JobID, row.ItemID, row.RelPath, row.Status, row.Error, row.Service, row.Timestamp, }) }