package discovery import ( "context" "encoding/json" "errors" "log/slog" "strings" "sync" "time" "github.com/jackc/pgx/v5/pgxpool" "github.com/ultisuite/ulti-backend/internal/llm" "github.com/ultisuite/ulti-backend/internal/websearch" ) type Service struct { db *pgxpool.Pool nc ncLister llm *llm.Client websearch *websearch.Client logger *slog.Logger scanCancels sync.Map // scanID -> context.CancelFunc } type ncLister interface { ListContacts(ctx context.Context, userID, bookPath string) ([]ncContact, error) } type ncContact struct { UID string FullName string Email string Phone string Org string } func NewService(db *pgxpool.Pool) *Service { return &Service{ db: db, llm: llm.NewClient(), websearch: websearch.NewClient(), logger: slog.Default().With("component", "contact-discovery"), } } func (s *Service) SetNextcloud(l ncLister) { s.nc = l } type mailAddress struct { Name string `json:"name"` Address string `json:"address"` } func (s *Service) GetScan(ctx context.Context, externalUserID, scanID string) (Scan, error) { var scan Scan var completedAt *time.Time var phase string err := s.db.QueryRow(ctx, ` SELECT sc.id::text, sc.status, sc.phase, sc.messages_scanned, sc.total_messages, sc.profiles_found, COALESCE(sc.profiles_total, 0), COALESCE(sc.error_message, ''), sc.started_at, sc.updated_at, sc.completed_at FROM contact_discovery_scans sc JOIN users u ON sc.user_id = u.id WHERE u.external_id = $1 AND sc.id = $2::uuid `, externalUserID, scanID).Scan( &scan.ID, &scan.Status, &phase, &scan.MessagesScanned, &scan.TotalMessages, &scan.ProfilesFound, &scan.ProfilesTotal, &scan.ErrorMessage, &scan.StartedAt, &scan.UpdatedAt, &completedAt, ) if err != nil { return Scan{}, err } scan.Phase = ScanPhase(phase) scan.CompletedAt = completedAt scan.ProgressPercent = scanProgressPercent(scan.Phase, scan.MessagesScanned, scan.TotalMessages, scan.ProfilesFound, scan.ProfilesTotal) return scan, nil } func (s *Service) runScan(externalUserID, ncUserID, bookID, scanID string) { scanCtx, cancel := context.WithCancel(context.Background()) s.scanCancels.Store(scanID, cancel) defer func() { cancel() s.scanCancels.Delete(scanID) if r := recover(); r != nil { s.logger.Error("contact discovery scan panicked", "scan_id", scanID, "panic", r) _, _ = s.db.Exec(context.Background(), ` UPDATE contact_discovery_scans SET status = 'failed', phase = 'done', error_message = 'scan crashed', completed_at = NOW(), updated_at = NOW() WHERE id = $1::uuid AND user_id = (SELECT id FROM users WHERE external_id = $2) AND status = 'running' `, scanID, externalUserID) } }() err := s.executeScan(scanCtx, externalUserID, ncUserID, bookID, scanID) if errors.Is(err, context.Canceled) { return } status := ScanCompleted phase := PhaseDone errMsg := "" if err != nil { status = ScanFailed errMsg = err.Error() s.logger.Error("contact discovery scan failed", "scan_id", scanID, "error", err) } _, _ = s.db.Exec(scanCtx, ` UPDATE contact_discovery_scans SET status = $3, phase = $4, error_message = NULLIF($5, ''), completed_at = NOW(), updated_at = NOW() WHERE id = $1::uuid AND user_id = (SELECT id FROM users WHERE external_id = $2) AND status = 'running' `, scanID, externalUserID, status, phase, errMsg) } func (s *Service) enrichHeartbeat(ctx context.Context, scanID, externalUserID string, messagesScanned, enrichDone, totalMessages, enrichTotal int) { ticker := time.NewTicker(3 * time.Second) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: progress := enrichDone + 1 if progress > enrichTotal { progress = enrichTotal } _ = s.updateScanProgress(ctx, scanID, externalUserID, PhaseEnriching, messagesScanned, progress, totalMessages, enrichTotal) } } } func (s *Service) executeScan(ctx context.Context, externalUserID, ncUserID, bookID, scanID string) error { rejections, err := s.loadRejections(ctx, externalUserID) if err != nil { return err } existingEmails, err := s.loadExistingContactEmails(ctx, externalUserID) if err != nil { return err } var totalMessages int _ = s.db.QueryRow(ctx, ` SELECT COUNT(*)::int FROM messages m JOIN mail_accounts ma ON m.account_id = ma.id WHERE ma.user_id = (SELECT id FROM users WHERE external_id = $1) `, externalUserID).Scan(&totalMessages) aggs := map[string]*addressAgg{} messagesScanned := 0 lastID := "" for { batch, err := s.scanMessageBatch(ctx, externalUserID, lastID, scanBatchSize) if err != nil { return err } if len(batch) == 0 { break } for _, row := range batch { s.processMessageRow(row, aggs, rejections, existingEmails) lastID = row.ID messagesScanned++ if messagesScanned%scanProgressEvery == 0 { _ = s.updateScanProgress(ctx, scanID, externalUserID, PhaseScanning, messagesScanned, 0, totalMessages, 0) } } _ = s.updateScanProgress(ctx, scanID, externalUserID, PhaseScanning, messagesScanned, 0, totalMessages, 0) if len(batch) < scanBatchSize { break } } profilesTotal := len(aggs) _ = s.updateScanProgress(ctx, scanID, externalUserID, PhaseProfiles, messagesScanned, 0, totalMessages, profilesTotal) llmSettings, _ := s.loadLLMSettings(ctx, externalUserID) llmEnabled := llmSettingsHasProvider(llmSettings) enrichCandidates := selectPreEnrichCandidates(aggs, maxLLMEnrichPerScan) enrichSet := enrichCandidateSet(enrichCandidates) profilesFound := 0 profileIndex := 0 profileIDs := make(map[string]string, len(aggs)) for email, agg := range aggs { if err := ctx.Err(); err != nil { return err } isML, isDisp, isSpamHeavy, reason := classifyAddress(agg) allEmailsJSON, _ := json.Marshal([]EmailEntry{{ Email: email, DisplayName: agg.DisplayName, MessageCount: agg.MessageCount, }}) accountsJSON, _ := accountHitsToJSON(agg.Accounts) var profileID string err := s.db.QueryRow(ctx, ` INSERT INTO contact_discovered_profiles ( user_id, scan_id, display_name, primary_email, all_emails, message_count, sent_count, received_count, outbound_count, inbound_from_cc_count, copresence_cc_bcc_count, spam_count, forwarded_count, is_mailing_list, is_disposable, is_spam_heavy, classification_reason, last_message_at, enrichment_status, status, detected_in_accounts ) VALUES ( (SELECT id FROM users WHERE external_id = $1), $2::uuid, $3, $4, $5::jsonb, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, NULLIF($17, ''), $18, 'pending', 'suggested', $19::jsonb ) ON CONFLICT (user_id, primary_email) DO UPDATE SET scan_id = EXCLUDED.scan_id, display_name = CASE WHEN EXCLUDED.display_name != '' THEN EXCLUDED.display_name ELSE contact_discovered_profiles.display_name END, all_emails = EXCLUDED.all_emails, message_count = EXCLUDED.message_count, sent_count = EXCLUDED.sent_count, received_count = EXCLUDED.received_count, outbound_count = EXCLUDED.outbound_count, inbound_from_cc_count = EXCLUDED.inbound_from_cc_count, copresence_cc_bcc_count = EXCLUDED.copresence_cc_bcc_count, spam_count = EXCLUDED.spam_count, forwarded_count = EXCLUDED.forwarded_count, is_mailing_list = EXCLUDED.is_mailing_list, is_disposable = EXCLUDED.is_disposable, is_spam_heavy = EXCLUDED.is_spam_heavy, classification_reason = EXCLUDED.classification_reason, last_message_at = EXCLUDED.last_message_at, detected_in_accounts = EXCLUDED.detected_in_accounts, status = CASE WHEN contact_discovered_profiles.status IN ('rejected', 'ignored', 'blocked', 'accepted') THEN contact_discovered_profiles.status ELSE 'suggested' END, updated_at = NOW() RETURNING id::text `, externalUserID, scanID, agg.DisplayName, email, string(allEmailsJSON), agg.MessageCount, agg.SentCount, agg.ReceivedCount, agg.OutboundCount, agg.InboundFromCCCount, agg.CopresenceCCBCCCount, agg.SpamCount, agg.ForwardedCount, isML, isDisp, isSpamHeavy, reason, agg.LastSeen, string(accountsJSON), ).Scan(&profileID) if err != nil { return err } profileIDs[email] = profileID profilesFound++ profileIndex++ sigs := agg.Signatures if len(sigs) > 5 { sigs = sigs[:5] } if len(sigs) > 0 { _, _ = s.db.Exec(ctx, `DELETE FROM contact_discovered_signatures WHERE profile_id = $1::uuid`, profileID) for _, sig := range sigs { _, _ = s.db.Exec(ctx, ` INSERT INTO contact_discovered_signatures (profile_id, message_id, signature_text, message_date, confidence) VALUES ($1::uuid, NULLIF($2, '')::uuid, $3, $4, $5) `, profileID, sig.MessageID, sig.SignatureText, sig.MessageDate, sig.Confidence) } } skipEnrich := EnrichSkipped if !enrichSet[email] || !llmEnabled || len(sigs) == 0 { _, _ = s.db.Exec(ctx, ` UPDATE contact_discovered_profiles SET enrichment_status = $2 WHERE id = $1::uuid `, profileID, skipEnrich) } if profileIndex%profileProgressEvery == 0 { _ = s.updateScanProgress(ctx, scanID, externalUserID, PhaseProfiles, messagesScanned, profileIndex, totalMessages, profilesTotal) } } _ = s.updateScanProgress(ctx, scanID, externalUserID, PhaseProfiles, messagesScanned, profilesTotal, totalMessages, profilesTotal) enrichDone := 0 enrichTotal := len(enrichCandidates) for _, c := range enrichCandidates { if err := ctx.Err(); err != nil { return err } email := c.email agg := c.agg profileID := profileIDs[email] if profileID == "" { enrichDone++ continue } progress := enrichDone + 1 if progress > enrichTotal { progress = enrichTotal } _ = s.updateScanProgress(ctx, scanID, externalUserID, PhaseEnriching, messagesScanned, progress, totalMessages, enrichTotal) s.fillSignaturesFromStoredMessages(ctx, email, agg) sigs := agg.Signatures if len(sigs) > 5 { sigs = sigs[:5] } if len(sigs) == 0 { _, _ = s.db.Exec(ctx, ` UPDATE contact_discovered_profiles SET enrichment_status = $2 WHERE id = $1::uuid `, profileID, EnrichSkipped) enrichDone++ continue } sigEntries := make([]SignatureEntry, 0, len(sigs)) for _, sig := range sigs { sigEntries = append(sigEntries, SignatureEntry{ MessageID: sig.MessageID, SignatureText: sig.SignatureText, MessageDate: sig.MessageDate, Confidence: sig.Confidence, }) } if enrichDone > 0 { select { case <-ctx.Done(): return ctx.Err() case <-time.After(llmEnrichDelay): } } heartbeatCtx, heartbeatCancel := context.WithCancel(context.Background()) go s.enrichHeartbeat(heartbeatCtx, scanID, externalUserID, messagesScanned, enrichDone, totalMessages, enrichTotal) enriched, enrichErr := enrichWithLLMTimeout(ctx, s.db, externalUserID, s.llm, llmSettings, email, agg.DisplayName, sigEntries, llmEnrichTimeout) heartbeatCancel() if enrichErr != nil { s.logger.Warn("llm enrichment failed", "email", email, "error", enrichErr) _, _ = s.db.Exec(ctx, ` UPDATE contact_discovered_profiles SET enrichment_status = 'failed' WHERE id = $1::uuid `, profileID) enrichDone++ continue } _ = s.applyEnrichmentResults(ctx, externalUserID, profileID, email, enriched, ncUserID, bookID, rejections) enrichDone++ _ = s.updateScanProgress(ctx, scanID, externalUserID, PhaseEnriching, messagesScanned, enrichDone, totalMessages, enrichTotal) } _ = s.assignPersonGroups(ctx, externalUserID) s.inferMissingCompanies(ctx, externalUserID, ncUserID, bookID) _, err = s.db.Exec(ctx, ` UPDATE contact_discovery_scans SET messages_scanned = $3, profiles_found = $4, profiles_total = $5, total_messages = $6, phase = 'done', updated_at = NOW() WHERE id = $1::uuid AND user_id = (SELECT id FROM users WHERE external_id = $2) `, scanID, externalUserID, messagesScanned, profilesFound, profilesTotal, totalMessages) return err } func llmSettingsHasProvider(settings llm.Settings) bool { _, _, err := llm.ResolveProvider(settings, "") return err == nil } type authInfo struct { ListUnsubscribe string `json:"list_unsubscribe"` MailedBy string `json:"mailed_by"` } func parseAuthInfo(raw []byte) authInfo { var a authInfo _ = json.Unmarshal(raw, &a) return a } func (row messageRow) rowIsMailingList(auth authInfo) bool { if strings.TrimSpace(auth.ListUnsubscribe) != "" { return true } for _, addr := range parseAddresses(row.FromAddr) { if isMailingListDomain(emailDomain(addr.Address)) { return true } } return false } func parseAddresses(raw []byte) []mailAddress { if len(raw) == 0 { return nil } var addrs []mailAddress if err := json.Unmarshal(raw, &addrs); err != nil { return nil } for i := range addrs { addrs[i].Address = strings.ToLower(strings.TrimSpace(addrs[i].Address)) } return addrs } func isSpamMessage(flags, labels []string) bool { for _, f := range flags { if strings.EqualFold(f, "spam") || strings.EqualFold(f, "junk") { return true } } for _, l := range labels { ll := strings.ToLower(l) if strings.Contains(ll, "spam") || strings.Contains(ll, "junk") || strings.Contains(ll, "indésirable") { return true } } return false } func isOwnAddress(email, accountEmail string) bool { return strings.EqualFold(strings.TrimSpace(email), strings.TrimSpace(accountEmail)) } func accountIsMessageRecipient(accountEmail string, to, cc, bcc []mailAddress) bool { accountEmail = strings.ToLower(strings.TrimSpace(accountEmail)) if accountEmail == "" { return false } for _, list := range [][]mailAddress{to, cc, bcc} { for _, addr := range list { if strings.ToLower(strings.TrimSpace(addr.Address)) == accountEmail { return true } } } return false } func (s *Service) loadRejections(ctx context.Context, externalUserID string) (map[string]bool, error) { out := map[string]bool{} rows, err := s.db.Query(ctx, ` SELECT rejection_key FROM contact_discovery_rejections WHERE user_id = (SELECT id FROM users WHERE external_id = $1) `, externalUserID) if err != nil { return nil, err } defer rows.Close() for rows.Next() { var key string if err := rows.Scan(&key); err != nil { return nil, err } out[key] = true } return out, rows.Err() } func (s *Service) loadExistingContactEmails(ctx context.Context, externalUserID string) (map[string]bool, error) { // Profiles already accepted are excluded; for now we rely on CardDAV sync on frontend // Backend stores linked_contact_uid for accepted profiles out := map[string]bool{} rows, err := s.db.Query(ctx, ` SELECT lower(primary_email) FROM contact_discovered_profiles WHERE user_id = (SELECT id FROM users WHERE external_id = $1) AND status = 'accepted' `, externalUserID) if err != nil { return out, nil } defer rows.Close() for rows.Next() { var email string if err := rows.Scan(&email); err != nil { return out, nil } out[email] = true } return out, nil } func (s *Service) loadLLMSettings(ctx context.Context, externalUserID string) (llm.Settings, error) { var raw []byte err := s.db.QueryRow(ctx, ` SELECT COALESCE(s.preferences->'llm', '{}'::jsonb) FROM users u LEFT JOIN settings s ON s.user_id = u.id WHERE u.external_id = $1 `, externalUserID).Scan(&raw) if err != nil { return llm.Settings{}, err } var settings llm.Settings _ = json.Unmarshal(raw, &settings) return settings, nil }