ultisuite-backend/internal/migration/calendar_import.go
R3D347HR4Y 7143a36c19
Some checks are pending
CI / Go tests (push) Waiting to run
CI / Integration tests (push) Waiting to run
CI / DB migrations (push) Waiting to run
feat(mail): integrate Stalwart hosted mail and migration features
- Added configuration options for Stalwart hosted mail in .env.example.
- Updated Docker Compose to include Stalwart service with health checks.
- Introduced new API endpoints for managing mail domains and migration projects.
- Enhanced Authentik blueprints for user enrollment and post-migration security.
- Updated OAuth handling for Google and Microsoft migration processes.
- Improved error handling and response structures in the mail API.
- Added integration tests for email claiming and migration workflows.
2026-06-13 12:47:08 +02:00

543 lines
16 KiB
Go

package migration
import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/url"
"strings"
"time"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/ultisuite/ulti-backend/internal/nextcloud"
)
const migrationCalendarID = "migration-import"
type CalendarImporter struct {
db *pgxpool.Pool
nc *nextcloud.Client
client *http.Client
userUPN string
}
func NewCalendarImporter(db *pgxpool.Pool, nc *nextcloud.Client) *CalendarImporter {
return &CalendarImporter{db: db, nc: nc, client: migrationHTTPClient()}
}
func (c *CalendarImporter) WithUserPrincipal(upn string) *CalendarImporter {
c.userUPN = strings.TrimSpace(upn)
return c
}
func (c *CalendarImporter) WithHTTPClient(client *http.Client) *CalendarImporter {
if client != nil {
c.client = client
}
return c
}
func (c *CalendarImporter) ImportBatch(ctx context.Context, job *Job, accessToken, provider string, delta bool, update progressUpdater) error {
if c.nc == nil {
return fmt.Errorf("nextcloud required for calendar migration")
}
user, err := resolveMigrationUser(ctx, c.db, job.UserID)
if err != nil {
return err
}
ncUserID := nextcloud.UserIDFromClaims(user.Email, user.ExternalID)
if _, err := c.nc.EnsurePrincipal(ctx, user.Email, user.ExternalID, user.Name); err != nil {
return fmt.Errorf("nextcloud user: %w", err)
}
calPath, err := c.ensureMigrationCalendar(ctx, ncUserID)
if err != nil {
return err
}
items, err := LoadImportedItemStore(ctx, c.db, job.ID, job.CursorJSON)
if err != nil {
return err
}
if delta && c.hasDeltaCursor(job, provider) {
return c.importDelta(ctx, job, accessToken, provider, ncUserID, calPath, items, update)
}
return c.importFull(ctx, job, accessToken, provider, ncUserID, calPath, delta, items, update)
}
func (c *CalendarImporter) hasDeltaCursor(job *Job, provider string) bool {
if provider == "google" {
return len(calendarSyncTokens(job.CursorJSON)) > 0
}
return len(calendarDeltaLinks(job.CursorJSON)) > 0
}
func (c *CalendarImporter) importFull(ctx context.Context, job *Job, accessToken, provider, ncUserID, calPath string, captureDelta bool, items *ImportedItemStore, update progressUpdater) error {
imported, _ := job.StatsJSON["imported"].(float64)
batch := 0
calIndex := int(jsonNumber(job.CursorJSON["calendarIndex"]))
sourceCalendars, err := c.listSourceCalendars(ctx, accessToken, provider)
if err != nil {
return err
}
if len(sourceCalendars) == 0 {
job.StatsJSON["imported"] = imported
job.StatsJSON["phase"] = "imported"
return update("completed", job.CursorJSON, job.StatsJSON, "")
}
if calIndex >= len(sourceCalendars) {
job.StatsJSON["imported"] = imported
job.StatsJSON["phase"] = "imported"
return update("completed", job.CursorJSON, job.StatsJSON, "")
}
sourceCal := sourceCalendars[calIndex]
pageToken, _ := job.CursorJSON["pageToken"].(string)
events, nextToken, syncToken, err := c.listSourceEvents(ctx, accessToken, provider, sourceCal, pageToken, "")
if err != nil {
return err
}
listIndex := int(jsonNumber(job.CursorJSON["listIndex"]))
for i := listIndex; i < len(events) && batch < mailImportBatchSize(); i++ {
ev := events[i]
if alreadyImported(items, ev.SourceID) {
continue
}
if err := c.nc.CreateEvent(ctx, ncUserID, calPath, ev.ToNextcloudEvent(provider)); err != nil {
if markErr := items.MarkFailed(ctx, ev.SourceID, err.Error(), ""); markErr != nil {
return markErr
}
incJobStat(job.StatsJSON, "failed")
batch++
continue
}
if err := items.MarkImported(ctx, ev.SourceID); err != nil {
return err
}
imported++
batch++
}
job.StatsJSON["imported"] = imported
if listIndex+batch < len(events) {
job.CursorJSON["listIndex"] = float64(listIndex + batch)
return update("pending", job.CursorJSON, job.StatsJSON, "")
}
delete(job.CursorJSON, "listIndex")
if nextToken != "" {
job.CursorJSON["pageToken"] = nextToken
return update("pending", job.CursorJSON, job.StatsJSON, "")
}
delete(job.CursorJSON, "pageToken")
if captureDelta {
if provider == "google" && syncToken != "" {
setCalendarSyncToken(job.CursorJSON, sourceCal.ID, syncToken)
}
if provider != "google" {
if link, err := c.bootstrapCalendarDelta(ctx, accessToken, sourceCal.ID); err == nil && link != "" {
setCalendarDeltaLink(job.CursorJSON, sourceCal.ID, link)
}
}
}
job.CursorJSON["calendarIndex"] = float64(calIndex + 1)
return update("pending", job.CursorJSON, job.StatsJSON, "")
}
func (c *CalendarImporter) importDelta(ctx context.Context, job *Job, accessToken, provider, ncUserID, calPath string, items *ImportedItemStore, update progressUpdater) error {
calIndex := int(jsonNumber(job.CursorJSON["calendarIndex"]))
sourceCalendars, err := c.listSourceCalendars(ctx, accessToken, provider)
if err != nil {
return err
}
if calIndex >= len(sourceCalendars) {
job.StatsJSON["phase"] = "delta"
return update("completed", job.CursorJSON, job.StatsJSON, "")
}
sourceCal := sourceCalendars[calIndex]
imported, _ := job.StatsJSON["delta_imported"].(float64)
updated, _ := job.StatsJSON["delta_updated"].(float64)
deleted, _ := job.StatsJSON["delta_deleted"].(float64)
var events []sourceEvent
var nextCursor string
if provider == "google" {
syncToken := calendarSyncTokens(job.CursorJSON)[sourceCal.ID]
pageToken, _ := job.CursorJSON["pageToken"].(string)
var syncTokenOut string
events, nextCursor, syncTokenOut, err = c.listSourceEvents(ctx, accessToken, provider, sourceCal, pageToken, syncToken)
if syncTokenOut != "" {
setCalendarSyncToken(job.CursorJSON, sourceCal.ID, syncTokenOut)
}
} else {
deltaLink := calendarDeltaLinks(job.CursorJSON)[sourceCal.ID]
if deltaLink == "" {
deltaLink, _ = job.CursorJSON["pageToken"].(string)
}
events, nextCursor, err = c.listMicrosoftCalendarDelta(ctx, accessToken, sourceCal.ID, deltaLink)
if nextCursor != "" && strings.Contains(nextCursor, "delta") {
setCalendarDeltaLink(job.CursorJSON, sourceCal.ID, nextCursor)
}
}
if err != nil {
return err
}
batch := 0
listIndex := int(jsonNumber(job.CursorJSON["listIndex"]))
for i := listIndex; i < len(events) && batch < mailImportBatchSize(); i++ {
ev := events[i]
if ev.Deleted {
eventPath := migrationEventPath(calPath, provider, ev.SourceID)
if err := c.nc.DeleteEvent(ctx, ncUserID, eventPath); err != nil && !isDeleteNotFound(err) {
return err
}
if err := items.Unmark(ctx, ev.SourceID); err != nil {
return err
}
deleted++
batch++
continue
}
wasUpdate, err := c.upsertEvent(ctx, ncUserID, calPath, provider, ev, items)
if err != nil {
return err
}
if wasUpdate {
updated++
} else if items.Has(ev.SourceID) {
imported++
} else {
incJobStat(job.StatsJSON, "failed")
}
batch++
}
job.StatsJSON["delta_imported"] = imported
job.StatsJSON["delta_updated"] = updated
job.StatsJSON["delta_deleted"] = deleted
if listIndex+batch < len(events) {
job.CursorJSON["listIndex"] = float64(listIndex + batch)
return update("pending", job.CursorJSON, job.StatsJSON, "")
}
delete(job.CursorJSON, "listIndex")
if nextCursor != "" {
job.CursorJSON["pageToken"] = nextCursor
return update("pending", job.CursorJSON, job.StatsJSON, "")
}
delete(job.CursorJSON, "pageToken")
job.CursorJSON["calendarIndex"] = float64(calIndex + 1)
return update("pending", job.CursorJSON, job.StatsJSON, "")
}
func (c *CalendarImporter) bootstrapCalendarDelta(ctx context.Context, accessToken, calID string) (string, error) {
url := graphMicrosoftURL(c.userUPN, fmt.Sprintf("/calendars/%s/events/delta?$select=id,subject,body,start,end,isAllDay,location", url.PathEscape(calID)))
body, err := apiGet(ctx, c.client, url, accessToken)
if err != nil {
return "", err
}
var parsed struct {
NextLink string `json:"@odata.nextLink"`
DeltaLink string `json:"@odata.deltaLink"`
}
if err := json.Unmarshal(body, &parsed); err != nil {
return "", err
}
if parsed.DeltaLink != "" {
return parsed.DeltaLink, nil
}
return parsed.NextLink, nil
}
func (c *CalendarImporter) listMicrosoftCalendarDelta(ctx context.Context, accessToken, calID, deltaLink string) ([]sourceEvent, string, error) {
if deltaLink == "" {
deltaLink = graphMicrosoftURL(c.userUPN, fmt.Sprintf("/calendars/%s/events/delta?$select=id,subject,body,start,end,isAllDay,location", url.PathEscape(calID)))
}
body, err := apiGet(ctx, c.client, deltaLink, accessToken)
if err != nil {
return nil, "", err
}
var parsed struct {
Value []graphCalendarEvent `json:"value"`
NextLink string `json:"@odata.nextLink"`
DeltaLink string `json:"@odata.deltaLink"`
}
if err := json.Unmarshal(body, &parsed); err != nil {
return nil, "", err
}
out := make([]sourceEvent, 0, len(parsed.Value))
for _, item := range parsed.Value {
out = append(out, item.toSourceEvent(calID))
}
next := parsed.NextLink
if parsed.DeltaLink != "" && parsed.NextLink == "" {
next = parsed.DeltaLink
}
return out, next, nil
}
func (c *CalendarImporter) ensureMigrationCalendar(ctx context.Context, ncUserID string) (string, error) {
path := fmt.Sprintf("/remote.php/dav/calendars/%s/%s/", ncUserID, migrationCalendarID)
if err := c.nc.CreateCalendar(ctx, ncUserID, migrationCalendarID, "Migration Import", "#1a73e8"); err != nil {
msg := strings.ToLower(err.Error())
if !strings.Contains(msg, "405") && !strings.Contains(msg, "409") && !strings.Contains(msg, "423") {
return "", err
}
}
return path, nil
}
type sourceCalendar struct {
ID string
Name string
}
type sourceEvent struct {
SourceID string
Summary string
Description string
Location string
Start time.Time
End time.Time
AllDay bool
Deleted bool
}
func (e sourceEvent) ToNextcloudEvent(provider string) *nextcloud.Event {
uid := sanitizeMigrationUID(provider, e.SourceID)
start := e.Start.UTC()
end := e.End.UTC()
if end.IsZero() || !end.After(start) {
end = start.Add(time.Hour)
}
ev := &nextcloud.Event{
UID: uid,
Summary: e.Summary,
Description: e.Description,
Location: e.Location,
AllDay: e.AllDay,
}
if e.AllDay {
ev.Start = start.Format("20060102")
ev.End = end.Format("20060102")
} else {
ev.Start = start.Format("20060102T150405Z")
ev.End = end.Format("20060102T150405Z")
}
return ev
}
func (c *CalendarImporter) listSourceCalendars(ctx context.Context, accessToken, provider string) ([]sourceCalendar, error) {
switch provider {
case "google":
body, err := apiGet(ctx, c.client, "https://www.googleapis.com/calendar/v3/users/me/calendarList?maxResults=100", accessToken)
if err != nil {
return nil, err
}
var parsed struct {
Items []struct {
ID string `json:"id"`
Summary string `json:"summary"`
} `json:"items"`
}
if err := json.Unmarshal(body, &parsed); err != nil {
return nil, err
}
out := make([]sourceCalendar, 0, len(parsed.Items))
for _, item := range parsed.Items {
out = append(out, sourceCalendar{ID: item.ID, Name: item.Summary})
}
return out, nil
default:
body, err := apiGet(ctx, c.client, graphMicrosoftURL(c.userUPN, "/calendars?$top=100"), accessToken)
if err != nil {
return nil, err
}
var parsed struct {
Value []struct {
ID string `json:"id"`
Name string `json:"name"`
} `json:"value"`
}
if err := json.Unmarshal(body, &parsed); err != nil {
return nil, err
}
out := make([]sourceCalendar, 0, len(parsed.Value))
for _, item := range parsed.Value {
out = append(out, sourceCalendar{ID: item.ID, Name: item.Name})
}
return out, nil
}
}
func (c *CalendarImporter) listSourceEvents(ctx context.Context, accessToken, provider string, cal sourceCalendar, pageToken, syncToken string) ([]sourceEvent, string, string, error) {
switch provider {
case "google":
listURL := fmt.Sprintf(
"https://www.googleapis.com/calendar/v3/calendars/%s/events?maxResults=100&singleEvents=true&orderBy=startTime",
url.PathEscape(cal.ID),
)
if syncToken != "" {
listURL += "&syncToken=" + url.QueryEscape(syncToken) + "&showDeleted=true"
} else if pageToken != "" {
listURL += "&pageToken=" + url.QueryEscape(pageToken)
}
body, err := apiGet(ctx, c.client, listURL, accessToken)
if err != nil {
return nil, "", "", err
}
var parsed struct {
Items []googleCalendarEvent `json:"items"`
NextPageToken string `json:"nextPageToken"`
NextSyncToken string `json:"nextSyncToken"`
}
if err := json.Unmarshal(body, &parsed); err != nil {
return nil, "", "", err
}
out := make([]sourceEvent, 0, len(parsed.Items))
for _, item := range parsed.Items {
out = append(out, item.toSourceEvent(cal.ID))
}
return out, parsed.NextPageToken, parsed.NextSyncToken, nil
default:
listURL := graphMicrosoftURL(c.userUPN, fmt.Sprintf("/calendars/%s/events?$top=100&$select=id,subject,body,start,end,isAllDay,location", url.PathEscape(cal.ID)))
if pageToken != "" {
listURL = pageToken
}
body, err := apiGet(ctx, c.client, listURL, accessToken)
if err != nil {
return nil, "", "", err
}
var parsed struct {
Value []graphCalendarEvent `json:"value"`
NextLink string `json:"@odata.nextLink"`
}
if err := json.Unmarshal(body, &parsed); err != nil {
return nil, "", "", err
}
out := make([]sourceEvent, 0, len(parsed.Value))
for _, item := range parsed.Value {
out = append(out, item.toSourceEvent(cal.ID))
}
return out, parsed.NextLink, "", nil
}
}
type googleCalendarEvent struct {
ID string `json:"id"`
Status string `json:"status"`
Summary string `json:"summary"`
Description string `json:"description"`
Location string `json:"location"`
Start struct {
DateTime string `json:"dateTime"`
Date string `json:"date"`
} `json:"start"`
End struct {
DateTime string `json:"dateTime"`
Date string `json:"date"`
} `json:"end"`
}
func (e googleCalendarEvent) toSourceEvent(calID string) sourceEvent {
allDay := e.Start.Date != ""
start := parseFlexibleTime(e.Start.DateTime, e.Start.Date)
end := parseFlexibleTime(e.End.DateTime, e.End.Date)
return sourceEvent{
SourceID: calID + ":" + e.ID,
Summary: e.Summary,
Description: e.Description,
Location: e.Location,
Start: start,
End: end,
AllDay: allDay,
Deleted: e.Status == "cancelled",
}
}
type graphCalendarEvent struct {
ID string `json:"id"`
Removed *struct {
Reason string `json:"reason"`
} `json:"@removed"`
Subject string `json:"subject"`
Body struct {
Content string `json:"content"`
} `json:"body"`
IsAllDay bool `json:"isAllDay"`
Location struct {
DisplayName string `json:"displayName"`
} `json:"location"`
Start graphDateTime `json:"start"`
End graphDateTime `json:"end"`
}
type graphDateTime struct {
DateTime string `json:"dateTime"`
Date string `json:"date"`
}
func (e graphCalendarEvent) toSourceEvent(calID string) sourceEvent {
allDay := e.IsAllDay || e.Start.Date != ""
start := parseFlexibleTime(e.Start.DateTime, e.Start.Date)
end := parseFlexibleTime(e.End.DateTime, e.End.Date)
return sourceEvent{
SourceID: calID + ":" + e.ID,
Summary: e.Subject,
Description: e.Body.Content,
Location: e.Location.DisplayName,
Start: start,
End: end,
AllDay: allDay,
Deleted: e.Removed != nil,
}
}
func parseFlexibleTime(dateTime, date string) time.Time {
if strings.TrimSpace(dateTime) != "" {
if t, err := time.Parse(time.RFC3339, dateTime); err == nil {
return t.UTC()
}
}
if strings.TrimSpace(date) != "" {
if t, err := time.Parse("2006-01-02", date); err == nil {
return t.UTC()
}
}
return time.Time{}
}
func (c *CalendarImporter) upsertEvent(
ctx context.Context,
ncUserID, calPath, provider string,
ev sourceEvent,
items *ImportedItemStore,
) (updated bool, err error) {
ncEv := ev.ToNextcloudEvent(provider)
if alreadyImported(items, ev.SourceID) {
eventPath := migrationEventPath(calPath, provider, ev.SourceID)
if _, err := c.nc.UpdateEvent(ctx, ncUserID, eventPath, "", ncEv); err != nil {
return false, err
}
return true, nil
}
if err := c.nc.CreateEvent(ctx, ncUserID, calPath, ncEv); err != nil {
if markErr := items.MarkFailed(ctx, ev.SourceID, err.Error(), ""); markErr != nil {
return false, markErr
}
return false, nil
}
if err := items.MarkImported(ctx, ev.SourceID); err != nil {
return false, err
}
return false, nil
}