ultisuite-backend/internal/migration/imported_items.go
R3D347HR4Y 7143a36c19
Some checks are pending
CI / Go tests (push) Waiting to run
CI / Integration tests (push) Waiting to run
CI / DB migrations (push) Waiting to run
feat(mail): integrate Stalwart hosted mail and migration features
- Added configuration options for Stalwart hosted mail in .env.example.
- Updated Docker Compose to include Stalwart service with health checks.
- Introduced new API endpoints for managing mail domains and migration projects.
- Enhanced Authentik blueprints for user enrollment and post-migration security.
- Updated OAuth handling for Google and Microsoft migration processes.
- Improved error handling and response structures in the mail API.
- Added integration tests for email claiming and migration workflows.
2026-06-13 12:47:08 +02:00

207 lines
5.0 KiB
Go

package migration
import (
"context"
"fmt"
"strings"
"github.com/jackc/pgx/v5/pgxpool"
)
// ImportedItemStore tracks imported source IDs and optional relative paths for a migration job.
// Data lives in migration_imported_items instead of unbounded cursor_json maps.
type ImportedItemStore struct {
db *pgxpool.Pool
jobID string
done map[string]struct{} // imported or skipped — resume skips these
paths map[string]string
}
func NewImportedItemStoreMemory() *ImportedItemStore {
return &ImportedItemStore{
done: map[string]struct{}{},
paths: map[string]string{},
}
}
func LoadImportedItemStore(ctx context.Context, db *pgxpool.Pool, jobID string, cursor map[string]any) (*ImportedItemStore, error) {
store := &ImportedItemStore{
db: db,
jobID: jobID,
done: map[string]struct{}{},
paths: map[string]string{},
}
if db != nil && jobID != "" {
rows, err := db.Query(ctx, `
SELECT source_id, rel_path, status
FROM migration_imported_items
WHERE job_id = $1::uuid
`, jobID)
if err != nil {
return nil, err
}
defer rows.Close()
for rows.Next() {
var sourceID, relPath, status string
if err := rows.Scan(&sourceID, &relPath, &status); err != nil {
return nil, err
}
if isImportedItemDone(status) {
store.done[sourceID] = struct{}{}
}
if relPath != "" {
store.paths[sourceID] = relPath
}
}
if err := rows.Err(); err != nil {
return nil, err
}
}
if err := store.migrateLegacyCursor(ctx, cursor); err != nil {
return nil, err
}
stripImportedCursorKeys(cursor)
return store, nil
}
func isImportedItemDone(status string) bool {
switch status {
case "", ItemStatusImported, ItemStatusSkipped:
return true
default:
return false
}
}
func stripImportedCursorKeys(cursor map[string]any) {
if cursor == nil {
return
}
delete(cursor, "imported_ids")
delete(cursor, "imported_paths")
}
func (s *ImportedItemStore) Has(id string) bool {
_, ok := s.done[id]
return ok
}
func (s *ImportedItemStore) Path(id string) string {
return s.paths[id]
}
func (s *ImportedItemStore) MarkImported(ctx context.Context, id string) error {
return s.upsertItem(ctx, id, "", ItemStatusImported, "")
}
func (s *ImportedItemStore) MarkPath(ctx context.Context, id, relPath string) error {
return s.upsertItem(ctx, id, relPath, ItemStatusImported, "")
}
func (s *ImportedItemStore) MarkSkipped(ctx context.Context, id, reason, relPath string) error {
return s.upsertItem(ctx, id, relPath, ItemStatusSkipped, reason)
}
func (s *ImportedItemStore) MarkFailed(ctx context.Context, id, reason, relPath string) error {
delete(s.done, id)
delete(s.paths, id)
if s.db == nil || s.jobID == "" || id == "" {
return nil
}
_, err := s.db.Exec(ctx, `
INSERT INTO migration_imported_items (job_id, source_id, rel_path, status, reason)
VALUES ($1::uuid, $2, $3, $4, $5)
ON CONFLICT (job_id, source_id) DO UPDATE
SET rel_path = EXCLUDED.rel_path,
status = EXCLUDED.status,
reason = EXCLUDED.reason,
imported_at = NOW()
`, s.jobID, id, relPath, ItemStatusFailed, truncateReason(reason))
return err
}
func (s *ImportedItemStore) upsertItem(ctx context.Context, id, relPath, status, reason string) error {
if id == "" {
return nil
}
if isImportedItemDone(status) {
s.done[id] = struct{}{}
} else {
delete(s.done, id)
}
if relPath != "" {
s.paths[id] = relPath
}
if s.db == nil || s.jobID == "" {
return nil
}
_, err := s.db.Exec(ctx, `
INSERT INTO migration_imported_items (job_id, source_id, rel_path, status, reason)
VALUES ($1::uuid, $2, $3, $4, $5)
ON CONFLICT (job_id, source_id) DO UPDATE
SET rel_path = EXCLUDED.rel_path,
status = EXCLUDED.status,
reason = EXCLUDED.reason,
imported_at = NOW()
`, s.jobID, id, relPath, status, truncateReason(reason))
return err
}
func truncateReason(reason string) string {
reason = strings.TrimSpace(reason)
const maxLen = 2000
if len(reason) <= maxLen {
return reason
}
return reason[:maxLen]
}
func (s *ImportedItemStore) Unmark(ctx context.Context, id string) error {
if id == "" {
return nil
}
delete(s.done, id)
delete(s.paths, id)
if s.db == nil || s.jobID == "" {
return nil
}
_, err := s.db.Exec(ctx, `
DELETE FROM migration_imported_items
WHERE job_id = $1::uuid AND source_id = $2
`, s.jobID, id)
return err
}
func (s *ImportedItemStore) migrateLegacyCursor(ctx context.Context, cursor map[string]any) error {
if cursor == nil {
return nil
}
rawIDs, _ := cursor["imported_ids"].(map[string]any)
rawPaths, _ := cursor["imported_paths"].(map[string]any)
if len(rawIDs) == 0 && len(rawPaths) == 0 {
return nil
}
seen := map[string]struct{}{}
for id := range rawIDs {
seen[id] = struct{}{}
}
for id := range rawPaths {
seen[id] = struct{}{}
}
for id := range seen {
relPath, _ := rawPaths[id].(string)
if relPath != "" {
if err := s.MarkPath(ctx, id, relPath); err != nil {
return fmt.Errorf("migrate imported path %q: %w", id, err)
}
continue
}
if err := s.MarkImported(ctx, id); err != nil {
return fmt.Errorf("migrate imported id %q: %w", id, err)
}
}
return nil
}