- Added configuration options for Stalwart hosted mail in .env.example. - Updated Docker Compose to include Stalwart service with health checks. - Introduced new API endpoints for managing mail domains and migration projects. - Enhanced Authentik blueprints for user enrollment and post-migration security. - Updated OAuth handling for Google and Microsoft migration processes. - Improved error handling and response structures in the mail API. - Added integration tests for email claiming and migration workflows.
477 lines
15 KiB
Go
477 lines
15 KiB
Go
package migration
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"net/http"
|
|
"net/url"
|
|
"strings"
|
|
|
|
"github.com/jackc/pgx/v5/pgxpool"
|
|
|
|
"github.com/ultisuite/ulti-backend/internal/nextcloud"
|
|
)
|
|
|
|
const migrationContactsBookID = "migration-import"
|
|
|
|
type ContactsImporter struct {
|
|
db *pgxpool.Pool
|
|
nc *nextcloud.Client
|
|
client *http.Client
|
|
userUPN string
|
|
}
|
|
|
|
func NewContactsImporter(db *pgxpool.Pool, nc *nextcloud.Client) *ContactsImporter {
|
|
return &ContactsImporter{db: db, nc: nc, client: migrationHTTPClient()}
|
|
}
|
|
|
|
func (c *ContactsImporter) WithUserPrincipal(upn string) *ContactsImporter {
|
|
c.userUPN = strings.TrimSpace(upn)
|
|
return c
|
|
}
|
|
|
|
func (c *ContactsImporter) WithHTTPClient(client *http.Client) *ContactsImporter {
|
|
if client != nil {
|
|
c.client = client
|
|
}
|
|
return c
|
|
}
|
|
|
|
func (c *ContactsImporter) ImportBatch(ctx context.Context, job *Job, accessToken, provider string, delta bool, update progressUpdater) error {
|
|
if c.nc == nil {
|
|
return fmt.Errorf("nextcloud required for contacts migration")
|
|
}
|
|
user, err := resolveMigrationUser(ctx, c.db, job.UserID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
ncUserID := nextcloud.UserIDFromClaims(user.Email, user.ExternalID)
|
|
if _, err := c.nc.EnsurePrincipal(ctx, user.Email, user.ExternalID, user.Name); err != nil {
|
|
return fmt.Errorf("nextcloud user: %w", err)
|
|
}
|
|
bookPath := nextcloud.AddressBookPath(ncUserID, migrationContactsBookID)
|
|
items, err := LoadImportedItemStore(ctx, c.db, job.ID, job.CursorJSON)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if delta {
|
|
if provider == "google" {
|
|
if syncToken, _ := job.CursorJSON["syncToken"].(string); strings.TrimSpace(syncToken) != "" {
|
|
return c.importGoogleDelta(ctx, job, accessToken, ncUserID, bookPath, items, update)
|
|
}
|
|
} else if deltaLink, _ := job.CursorJSON["deltaLink"].(string); strings.TrimSpace(deltaLink) != "" {
|
|
return c.importMicrosoftDelta(ctx, job, accessToken, ncUserID, bookPath, deltaLink, items, update)
|
|
} else {
|
|
return c.bootstrapMicrosoftDelta(ctx, job, accessToken, ncUserID, bookPath, update)
|
|
}
|
|
}
|
|
|
|
switch provider {
|
|
case "google":
|
|
return c.importGoogleFull(ctx, job, accessToken, ncUserID, bookPath, delta, items, update)
|
|
default:
|
|
return c.importMicrosoftFull(ctx, job, accessToken, ncUserID, bookPath, delta, items, update)
|
|
}
|
|
}
|
|
|
|
func (c *ContactsImporter) importGoogleFull(ctx context.Context, job *Job, accessToken, ncUserID, bookPath string, captureToken bool, items *ImportedItemStore, update progressUpdater) error {
|
|
imported, _ := job.StatsJSON["imported"].(float64)
|
|
batch := 0
|
|
|
|
pageToken, _ := job.CursorJSON["pageToken"].(string)
|
|
listURL := "https://people.googleapis.com/v1/people/me/connections?pageSize=100&personFields=names,emailAddresses,phoneNumbers,organizations,metadata"
|
|
if captureToken {
|
|
listURL += "&requestSyncToken=true"
|
|
}
|
|
if pageToken != "" {
|
|
listURL += "&pageToken=" + url.QueryEscape(pageToken)
|
|
}
|
|
body, err := apiGet(ctx, c.client, listURL, accessToken)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
var listed struct {
|
|
Connections []googlePerson `json:"connections"`
|
|
NextPageToken string `json:"nextPageToken"`
|
|
NextSyncToken string `json:"nextSyncToken"`
|
|
}
|
|
if err := json.Unmarshal(body, &listed); err != nil {
|
|
return err
|
|
}
|
|
|
|
listIndex := int(jsonNumber(job.CursorJSON["listIndex"]))
|
|
for i := listIndex; i < len(listed.Connections) && batch < mailImportBatchSize(); i++ {
|
|
person := listed.Connections[i]
|
|
sourceID := strings.TrimSpace(person.ResourceName)
|
|
if sourceID == "" {
|
|
sourceID = fmt.Sprintf("google-person-%d", i)
|
|
}
|
|
if alreadyImported(items, sourceID) {
|
|
continue
|
|
}
|
|
contact := googlePersonToContact(sourceID, person)
|
|
if contact.Email == "" && contact.FullName == "" {
|
|
continue
|
|
}
|
|
if _, err := c.nc.CreateContact(ctx, ncUserID, bookPath, contact); err != nil {
|
|
if markErr := items.MarkFailed(ctx, sourceID, err.Error(), ""); markErr != nil {
|
|
return markErr
|
|
}
|
|
incJobStat(job.StatsJSON, "failed")
|
|
batch++
|
|
continue
|
|
}
|
|
if err := items.MarkImported(ctx, sourceID); err != nil {
|
|
return err
|
|
}
|
|
imported++
|
|
batch++
|
|
}
|
|
job.StatsJSON["imported"] = imported
|
|
|
|
if listIndex+batch < len(listed.Connections) {
|
|
job.CursorJSON["listIndex"] = float64(listIndex + batch)
|
|
return update("pending", job.CursorJSON, job.StatsJSON, "")
|
|
}
|
|
delete(job.CursorJSON, "listIndex")
|
|
|
|
if listed.NextPageToken != "" {
|
|
job.CursorJSON["pageToken"] = listed.NextPageToken
|
|
return update("pending", job.CursorJSON, job.StatsJSON, "")
|
|
}
|
|
delete(job.CursorJSON, "pageToken")
|
|
if listed.NextSyncToken != "" {
|
|
job.CursorJSON["syncToken"] = listed.NextSyncToken
|
|
}
|
|
job.StatsJSON["phase"] = "imported"
|
|
return update("completed", job.CursorJSON, job.StatsJSON, "")
|
|
}
|
|
|
|
func (c *ContactsImporter) importGoogleDelta(ctx context.Context, job *Job, accessToken, ncUserID, bookPath string, items *ImportedItemStore, update progressUpdater) error {
|
|
syncToken, _ := job.CursorJSON["syncToken"].(string)
|
|
listURL := "https://people.googleapis.com/v1/people/me/connections?syncToken=" + url.QueryEscape(syncToken) +
|
|
"&personFields=names,emailAddresses,phoneNumbers,organizations,metadata&requestSyncToken=true"
|
|
body, err := apiGet(ctx, c.client, listURL, accessToken)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
var listed struct {
|
|
Connections []googlePerson `json:"connections"`
|
|
NextSyncToken string `json:"nextSyncToken"`
|
|
}
|
|
if err := json.Unmarshal(body, &listed); err != nil {
|
|
return err
|
|
}
|
|
deltaCount, _ := job.StatsJSON["delta_imported"].(float64)
|
|
deltaUpdated, _ := job.StatsJSON["delta_updated"].(float64)
|
|
deleted, _ := job.StatsJSON["delta_deleted"].(float64)
|
|
for _, person := range listed.Connections {
|
|
sourceID := strings.TrimSpace(person.ResourceName)
|
|
if sourceID == "" {
|
|
continue
|
|
}
|
|
if person.Metadata != nil && person.Metadata.Deleted {
|
|
contactPath := migrationContactPath(bookPath, "google", sourceID)
|
|
if err := c.nc.DeleteContact(ctx, ncUserID, contactPath); err != nil && !isDeleteNotFound(err) {
|
|
return err
|
|
}
|
|
if err := items.Unmark(ctx, sourceID); err != nil {
|
|
return err
|
|
}
|
|
deleted++
|
|
continue
|
|
}
|
|
contact := googlePersonToContact(sourceID, person)
|
|
if contact.Email == "" && contact.FullName == "" {
|
|
continue
|
|
}
|
|
updated, err := c.upsertContact(ctx, ncUserID, bookPath, "google", sourceID, contact, items)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if updated {
|
|
deltaUpdated++
|
|
} else if items.Has(sourceID) {
|
|
deltaCount++
|
|
} else {
|
|
incJobStat(job.StatsJSON, "failed")
|
|
}
|
|
}
|
|
if listed.NextSyncToken != "" {
|
|
job.CursorJSON["syncToken"] = listed.NextSyncToken
|
|
}
|
|
job.StatsJSON["delta_imported"] = deltaCount
|
|
job.StatsJSON["delta_updated"] = deltaUpdated
|
|
job.StatsJSON["delta_deleted"] = deleted
|
|
job.StatsJSON["phase"] = "delta"
|
|
return update("completed", job.CursorJSON, job.StatsJSON, "")
|
|
}
|
|
|
|
func (c *ContactsImporter) importMicrosoftFull(ctx context.Context, job *Job, accessToken, ncUserID, bookPath string, captureDelta bool, items *ImportedItemStore, update progressUpdater) error {
|
|
imported, _ := job.StatsJSON["imported"].(float64)
|
|
batch := 0
|
|
|
|
nextLink, _ := job.CursorJSON["nextLink"].(string)
|
|
listURL := graphMicrosoftURL(c.userUPN, "/contacts?$top=100&$select=id,displayName,givenName,surname,emailAddresses,mobilePhone,businessPhones,companyName")
|
|
if nextLink != "" {
|
|
listURL = nextLink
|
|
}
|
|
body, err := apiGet(ctx, c.client, listURL, accessToken)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
var listed struct {
|
|
Value []graphContact `json:"value"`
|
|
NextLink string `json:"@odata.nextLink"`
|
|
}
|
|
if err := json.Unmarshal(body, &listed); err != nil {
|
|
return err
|
|
}
|
|
|
|
listIndex := int(jsonNumber(job.CursorJSON["listIndex"]))
|
|
for i := listIndex; i < len(listed.Value) && batch < mailImportBatchSize(); i++ {
|
|
item := listed.Value[i]
|
|
sourceID := strings.TrimSpace(item.ID)
|
|
if sourceID == "" {
|
|
sourceID = fmt.Sprintf("graph-contact-%d", i)
|
|
}
|
|
if alreadyImported(items, sourceID) {
|
|
continue
|
|
}
|
|
contact := graphContactToNC(sourceID, item)
|
|
if contact.Email == "" && contact.FullName == "" {
|
|
continue
|
|
}
|
|
if _, err := c.nc.CreateContact(ctx, ncUserID, bookPath, contact); err != nil {
|
|
if markErr := items.MarkFailed(ctx, sourceID, err.Error(), ""); markErr != nil {
|
|
return markErr
|
|
}
|
|
incJobStat(job.StatsJSON, "failed")
|
|
batch++
|
|
continue
|
|
}
|
|
if err := items.MarkImported(ctx, sourceID); err != nil {
|
|
return err
|
|
}
|
|
imported++
|
|
batch++
|
|
}
|
|
job.StatsJSON["imported"] = imported
|
|
|
|
if listIndex+batch < len(listed.Value) {
|
|
job.CursorJSON["listIndex"] = float64(listIndex + batch)
|
|
return update("pending", job.CursorJSON, job.StatsJSON, "")
|
|
}
|
|
delete(job.CursorJSON, "listIndex")
|
|
|
|
if listed.NextLink != "" {
|
|
job.CursorJSON["nextLink"] = listed.NextLink
|
|
return update("pending", job.CursorJSON, job.StatsJSON, "")
|
|
}
|
|
delete(job.CursorJSON, "nextLink")
|
|
|
|
if captureDelta {
|
|
return c.bootstrapMicrosoftDelta(ctx, job, accessToken, ncUserID, bookPath, update)
|
|
}
|
|
job.StatsJSON["phase"] = "imported"
|
|
return update("completed", job.CursorJSON, job.StatsJSON, "")
|
|
}
|
|
|
|
func (c *ContactsImporter) bootstrapMicrosoftDelta(ctx context.Context, job *Job, accessToken, ncUserID, bookPath string, update progressUpdater) error {
|
|
body, err := apiGet(ctx, c.client, graphMicrosoftURL(c.userUPN, "/contacts/delta?$select=id,displayName,givenName,surname,emailAddresses,mobilePhone,businessPhones,companyName"), accessToken)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
var listed struct {
|
|
Value []graphContact `json:"value"`
|
|
NextLink string `json:"@odata.nextLink"`
|
|
DeltaLink string `json:"@odata.deltaLink"`
|
|
}
|
|
if err := json.Unmarshal(body, &listed); err != nil {
|
|
return err
|
|
}
|
|
if listed.DeltaLink != "" {
|
|
job.CursorJSON["deltaLink"] = listed.DeltaLink
|
|
job.StatsJSON["phase"] = "delta_ready"
|
|
return update("completed", job.CursorJSON, job.StatsJSON, "")
|
|
}
|
|
if listed.NextLink != "" {
|
|
job.CursorJSON["nextLink"] = listed.NextLink
|
|
return update("pending", job.CursorJSON, job.StatsJSON, "")
|
|
}
|
|
job.StatsJSON["phase"] = "imported"
|
|
return update("completed", job.CursorJSON, job.StatsJSON, "")
|
|
}
|
|
|
|
func (c *ContactsImporter) importMicrosoftDelta(ctx context.Context, job *Job, accessToken, ncUserID, bookPath, deltaLink string, items *ImportedItemStore, update progressUpdater) error {
|
|
body, err := apiGet(ctx, c.client, deltaLink, accessToken)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
var listed struct {
|
|
Value []graphContact `json:"value"`
|
|
NextLink string `json:"@odata.nextLink"`
|
|
DeltaLink string `json:"@odata.deltaLink"`
|
|
}
|
|
if err := json.Unmarshal(body, &listed); err != nil {
|
|
return err
|
|
}
|
|
deltaCount, _ := job.StatsJSON["delta_imported"].(float64)
|
|
deltaUpdated, _ := job.StatsJSON["delta_updated"].(float64)
|
|
deleted, _ := job.StatsJSON["delta_deleted"].(float64)
|
|
for _, item := range listed.Value {
|
|
sourceID := strings.TrimSpace(item.ID)
|
|
if sourceID == "" {
|
|
continue
|
|
}
|
|
if item.Removed != nil {
|
|
contactPath := migrationContactPath(bookPath, "microsoft", sourceID)
|
|
if err := c.nc.DeleteContact(ctx, ncUserID, contactPath); err != nil && !isDeleteNotFound(err) {
|
|
return err
|
|
}
|
|
if err := items.Unmark(ctx, sourceID); err != nil {
|
|
return err
|
|
}
|
|
deleted++
|
|
continue
|
|
}
|
|
contact := graphContactToNC(sourceID, item)
|
|
if contact.Email == "" && contact.FullName == "" {
|
|
continue
|
|
}
|
|
updated, err := c.upsertContact(ctx, ncUserID, bookPath, "microsoft", sourceID, contact, items)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if updated {
|
|
deltaUpdated++
|
|
} else if items.Has(sourceID) {
|
|
deltaCount++
|
|
} else {
|
|
incJobStat(job.StatsJSON, "failed")
|
|
}
|
|
}
|
|
if listed.NextLink != "" {
|
|
job.CursorJSON["deltaLink"] = listed.NextLink
|
|
return update("pending", job.CursorJSON, job.StatsJSON, "")
|
|
}
|
|
if listed.DeltaLink != "" {
|
|
job.CursorJSON["deltaLink"] = listed.DeltaLink
|
|
}
|
|
job.StatsJSON["delta_imported"] = deltaCount
|
|
job.StatsJSON["delta_updated"] = deltaUpdated
|
|
job.StatsJSON["delta_deleted"] = deleted
|
|
job.StatsJSON["phase"] = "delta"
|
|
return update("completed", job.CursorJSON, job.StatsJSON, "")
|
|
}
|
|
|
|
type googlePerson struct {
|
|
ResourceName string `json:"resourceName"`
|
|
Metadata *struct {
|
|
Deleted bool `json:"deleted"`
|
|
} `json:"metadata"`
|
|
Names []struct {
|
|
DisplayName string `json:"displayName"`
|
|
} `json:"names"`
|
|
EmailAddresses []struct {
|
|
Value string `json:"value"`
|
|
} `json:"emailAddresses"`
|
|
PhoneNumbers []struct {
|
|
Value string `json:"value"`
|
|
} `json:"phoneNumbers"`
|
|
Organizations []struct {
|
|
Name string `json:"name"`
|
|
} `json:"organizations"`
|
|
}
|
|
|
|
type graphContact struct {
|
|
ID string `json:"id"`
|
|
Removed *struct {
|
|
Reason string `json:"reason"`
|
|
} `json:"@removed"`
|
|
DisplayName string `json:"displayName"`
|
|
GivenName string `json:"givenName"`
|
|
Surname string `json:"surname"`
|
|
MobilePhone string `json:"mobilePhone"`
|
|
BusinessPhones []string `json:"businessPhones"`
|
|
CompanyName string `json:"companyName"`
|
|
EmailAddresses []struct {
|
|
Address string `json:"address"`
|
|
} `json:"emailAddresses"`
|
|
}
|
|
|
|
func googlePersonToContact(sourceID string, p googlePerson) *nextcloud.Contact {
|
|
name := ""
|
|
if len(p.Names) > 0 {
|
|
name = strings.TrimSpace(p.Names[0].DisplayName)
|
|
}
|
|
email := ""
|
|
if len(p.EmailAddresses) > 0 {
|
|
email = strings.ToLower(strings.TrimSpace(p.EmailAddresses[0].Value))
|
|
}
|
|
phone := ""
|
|
if len(p.PhoneNumbers) > 0 {
|
|
phone = strings.TrimSpace(p.PhoneNumbers[0].Value)
|
|
}
|
|
org := ""
|
|
if len(p.Organizations) > 0 {
|
|
org = strings.TrimSpace(p.Organizations[0].Name)
|
|
}
|
|
return &nextcloud.Contact{
|
|
UID: sanitizeMigrationUID("google", sourceID),
|
|
FullName: name,
|
|
Email: email,
|
|
Phone: phone,
|
|
Org: org,
|
|
}
|
|
}
|
|
|
|
func graphContactToNC(sourceID string, c graphContact) *nextcloud.Contact {
|
|
name := strings.TrimSpace(c.DisplayName)
|
|
if name == "" {
|
|
name = strings.TrimSpace(strings.TrimSpace(c.GivenName + " " + c.Surname))
|
|
}
|
|
email := ""
|
|
if len(c.EmailAddresses) > 0 {
|
|
email = strings.ToLower(strings.TrimSpace(c.EmailAddresses[0].Address))
|
|
}
|
|
phone := strings.TrimSpace(c.MobilePhone)
|
|
if phone == "" && len(c.BusinessPhones) > 0 {
|
|
phone = strings.TrimSpace(c.BusinessPhones[0])
|
|
}
|
|
return &nextcloud.Contact{
|
|
UID: sanitizeMigrationUID("microsoft", sourceID),
|
|
FullName: name,
|
|
Email: email,
|
|
Phone: phone,
|
|
Org: strings.TrimSpace(c.CompanyName),
|
|
}
|
|
}
|
|
|
|
func (c *ContactsImporter) upsertContact(
|
|
ctx context.Context,
|
|
ncUserID, bookPath, provider, sourceID string,
|
|
contact *nextcloud.Contact,
|
|
items *ImportedItemStore,
|
|
) (updated bool, err error) {
|
|
if alreadyImported(items, sourceID) {
|
|
contactPath := migrationContactPath(bookPath, provider, sourceID)
|
|
if _, err := c.nc.UpdateContact(ctx, ncUserID, contactPath, "", contact); err != nil {
|
|
return false, err
|
|
}
|
|
return true, nil
|
|
}
|
|
if _, err := c.nc.CreateContact(ctx, ncUserID, bookPath, contact); err != nil {
|
|
if markErr := items.MarkFailed(ctx, sourceID, err.Error(), ""); markErr != nil {
|
|
return false, markErr
|
|
}
|
|
return false, nil
|
|
}
|
|
if err := items.MarkImported(ctx, sourceID); err != nil {
|
|
return false, err
|
|
}
|
|
return false, nil
|
|
}
|