ultisuite-backend/internal/contacts/discovery/service.go
R3D347HR4Y 556d5f416d Enhance API and configuration for contact discovery and public sharing
- Introduced new endpoints for contact discovery, including scanning, listing, and managing discovered contacts.
- Implemented retry logic for handling missing DAV credentials during contact operations.
- Added public share functionality for drive API, allowing users to manage public shares, including upload, delete, and rename operations.
- Updated Nextcloud configuration to support public share links and improved error handling for public share permissions.
- Enhanced logging and validation across contact and drive APIs for better error tracking and user feedback.
- Added tests for new contact matching and ranking functionalities to ensure accuracy and reliability.
2026-06-06 20:27:02 +02:00

498 lines
15 KiB
Go

package discovery
import (
"context"
"encoding/json"
"errors"
"log/slog"
"strings"
"sync"
"time"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/ultisuite/ulti-backend/internal/llm"
"github.com/ultisuite/ulti-backend/internal/websearch"
)
type Service struct {
db *pgxpool.Pool
nc ncLister
llm *llm.Client
websearch *websearch.Client
logger *slog.Logger
scanCancels sync.Map // scanID -> context.CancelFunc
}
type ncLister interface {
ListContacts(ctx context.Context, userID, bookPath string) ([]ncContact, error)
}
type ncContact struct {
UID string
FullName string
Email string
Phone string
Org string
}
func NewService(db *pgxpool.Pool) *Service {
return &Service{
db: db,
llm: llm.NewClient(),
websearch: websearch.NewClient(),
logger: slog.Default().With("component", "contact-discovery"),
}
}
func (s *Service) SetNextcloud(l ncLister) {
s.nc = l
}
type mailAddress struct {
Name string `json:"name"`
Address string `json:"address"`
}
func (s *Service) GetScan(ctx context.Context, externalUserID, scanID string) (Scan, error) {
var scan Scan
var completedAt *time.Time
var phase string
err := s.db.QueryRow(ctx, `
SELECT sc.id::text, sc.status, sc.phase, sc.messages_scanned, sc.total_messages,
sc.profiles_found, COALESCE(sc.profiles_total, 0), COALESCE(sc.error_message, ''), sc.started_at, sc.updated_at,
sc.completed_at
FROM contact_discovery_scans sc
JOIN users u ON sc.user_id = u.id
WHERE u.external_id = $1 AND sc.id = $2::uuid
`, externalUserID, scanID).Scan(
&scan.ID, &scan.Status, &phase, &scan.MessagesScanned, &scan.TotalMessages,
&scan.ProfilesFound, &scan.ProfilesTotal, &scan.ErrorMessage, &scan.StartedAt, &scan.UpdatedAt, &completedAt,
)
if err != nil {
return Scan{}, err
}
scan.Phase = ScanPhase(phase)
scan.CompletedAt = completedAt
scan.ProgressPercent = scanProgressPercent(scan.Phase, scan.MessagesScanned, scan.TotalMessages, scan.ProfilesFound, scan.ProfilesTotal)
return scan, nil
}
func (s *Service) runScan(externalUserID, ncUserID, bookID, scanID string) {
scanCtx, cancel := context.WithCancel(context.Background())
s.scanCancels.Store(scanID, cancel)
defer func() {
cancel()
s.scanCancels.Delete(scanID)
if r := recover(); r != nil {
s.logger.Error("contact discovery scan panicked", "scan_id", scanID, "panic", r)
_, _ = s.db.Exec(context.Background(), `
UPDATE contact_discovery_scans
SET status = 'failed', phase = 'done',
error_message = 'scan crashed',
completed_at = NOW(), updated_at = NOW()
WHERE id = $1::uuid AND user_id = (SELECT id FROM users WHERE external_id = $2)
AND status = 'running'
`, scanID, externalUserID)
}
}()
err := s.executeScan(scanCtx, externalUserID, ncUserID, bookID, scanID)
if errors.Is(err, context.Canceled) {
return
}
status := ScanCompleted
phase := PhaseDone
errMsg := ""
if err != nil {
status = ScanFailed
errMsg = err.Error()
s.logger.Error("contact discovery scan failed", "scan_id", scanID, "error", err)
}
_, _ = s.db.Exec(scanCtx, `
UPDATE contact_discovery_scans
SET status = $3, phase = $4, error_message = NULLIF($5, ''), completed_at = NOW(), updated_at = NOW()
WHERE id = $1::uuid AND user_id = (SELECT id FROM users WHERE external_id = $2)
AND status = 'running'
`, scanID, externalUserID, status, phase, errMsg)
}
func (s *Service) enrichHeartbeat(ctx context.Context, scanID, externalUserID string, messagesScanned, enrichDone, totalMessages, enrichTotal int) {
ticker := time.NewTicker(3 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
progress := enrichDone + 1
if progress > enrichTotal {
progress = enrichTotal
}
_ = s.updateScanProgress(ctx, scanID, externalUserID, PhaseEnriching, messagesScanned, progress, totalMessages, enrichTotal)
}
}
}
func (s *Service) executeScan(ctx context.Context, externalUserID, ncUserID, bookID, scanID string) error {
rejections, err := s.loadRejections(ctx, externalUserID)
if err != nil {
return err
}
existingEmails, err := s.loadExistingContactEmails(ctx, externalUserID)
if err != nil {
return err
}
var totalMessages int
_ = s.db.QueryRow(ctx, `
SELECT COUNT(*)::int FROM messages m
JOIN mail_accounts ma ON m.account_id = ma.id
WHERE ma.user_id = (SELECT id FROM users WHERE external_id = $1)
`, externalUserID).Scan(&totalMessages)
aggs := map[string]*addressAgg{}
messagesScanned := 0
lastID := ""
for {
batch, err := s.scanMessageBatch(ctx, externalUserID, lastID, scanBatchSize)
if err != nil {
return err
}
if len(batch) == 0 {
break
}
for _, row := range batch {
s.processMessageRow(row, aggs, rejections, existingEmails)
lastID = row.ID
messagesScanned++
if messagesScanned%scanProgressEvery == 0 {
_ = s.updateScanProgress(ctx, scanID, externalUserID, PhaseScanning, messagesScanned, 0, totalMessages, 0)
}
}
_ = s.updateScanProgress(ctx, scanID, externalUserID, PhaseScanning, messagesScanned, 0, totalMessages, 0)
if len(batch) < scanBatchSize {
break
}
}
profilesTotal := len(aggs)
_ = s.updateScanProgress(ctx, scanID, externalUserID, PhaseProfiles, messagesScanned, 0, totalMessages, profilesTotal)
llmSettings, _ := s.loadLLMSettings(ctx, externalUserID)
llmEnabled := llmSettingsHasProvider(llmSettings)
enrichCandidates := selectPreEnrichCandidates(aggs, maxLLMEnrichPerScan)
enrichSet := enrichCandidateSet(enrichCandidates)
profilesFound := 0
profileIndex := 0
profileIDs := make(map[string]string, len(aggs))
for email, agg := range aggs {
if err := ctx.Err(); err != nil {
return err
}
isML, isDisp, isSpamHeavy, reason := classifyAddress(agg)
allEmailsJSON, _ := json.Marshal([]EmailEntry{{
Email: email, DisplayName: agg.DisplayName, MessageCount: agg.MessageCount,
}})
accountsJSON, _ := accountHitsToJSON(agg.Accounts)
var profileID string
err := s.db.QueryRow(ctx, `
INSERT INTO contact_discovered_profiles (
user_id, scan_id, display_name, primary_email, all_emails,
message_count, sent_count, received_count,
outbound_count, inbound_from_cc_count, copresence_cc_bcc_count,
spam_count, forwarded_count,
is_mailing_list, is_disposable, is_spam_heavy, classification_reason,
last_message_at, enrichment_status, status, detected_in_accounts
)
VALUES (
(SELECT id FROM users WHERE external_id = $1), $2::uuid, $3, $4, $5::jsonb,
$6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, NULLIF($17, ''), $18,
'pending', 'suggested', $19::jsonb
)
ON CONFLICT (user_id, primary_email) DO UPDATE SET
scan_id = EXCLUDED.scan_id,
display_name = CASE WHEN EXCLUDED.display_name != '' THEN EXCLUDED.display_name ELSE contact_discovered_profiles.display_name END,
all_emails = EXCLUDED.all_emails,
message_count = EXCLUDED.message_count,
sent_count = EXCLUDED.sent_count,
received_count = EXCLUDED.received_count,
outbound_count = EXCLUDED.outbound_count,
inbound_from_cc_count = EXCLUDED.inbound_from_cc_count,
copresence_cc_bcc_count = EXCLUDED.copresence_cc_bcc_count,
spam_count = EXCLUDED.spam_count,
forwarded_count = EXCLUDED.forwarded_count,
is_mailing_list = EXCLUDED.is_mailing_list,
is_disposable = EXCLUDED.is_disposable,
is_spam_heavy = EXCLUDED.is_spam_heavy,
classification_reason = EXCLUDED.classification_reason,
last_message_at = EXCLUDED.last_message_at,
detected_in_accounts = EXCLUDED.detected_in_accounts,
status = CASE
WHEN contact_discovered_profiles.status IN ('rejected', 'ignored', 'blocked', 'accepted')
THEN contact_discovered_profiles.status
ELSE 'suggested'
END,
updated_at = NOW()
RETURNING id::text
`, externalUserID, scanID, agg.DisplayName, email, string(allEmailsJSON),
agg.MessageCount, agg.SentCount, agg.ReceivedCount,
agg.OutboundCount, agg.InboundFromCCCount, agg.CopresenceCCBCCCount,
agg.SpamCount, agg.ForwardedCount,
isML, isDisp, isSpamHeavy, reason, agg.LastSeen, string(accountsJSON),
).Scan(&profileID)
if err != nil {
return err
}
profileIDs[email] = profileID
profilesFound++
profileIndex++
sigs := agg.Signatures
if len(sigs) > 5 {
sigs = sigs[:5]
}
if len(sigs) > 0 {
_, _ = s.db.Exec(ctx, `DELETE FROM contact_discovered_signatures WHERE profile_id = $1::uuid`, profileID)
for _, sig := range sigs {
_, _ = s.db.Exec(ctx, `
INSERT INTO contact_discovered_signatures (profile_id, message_id, signature_text, message_date, confidence)
VALUES ($1::uuid, NULLIF($2, '')::uuid, $3, $4, $5)
`, profileID, sig.MessageID, sig.SignatureText, sig.MessageDate, sig.Confidence)
}
}
skipEnrich := EnrichSkipped
if !enrichSet[email] || !llmEnabled || len(sigs) == 0 {
_, _ = s.db.Exec(ctx, `
UPDATE contact_discovered_profiles SET enrichment_status = $2 WHERE id = $1::uuid
`, profileID, skipEnrich)
}
if profileIndex%profileProgressEvery == 0 {
_ = s.updateScanProgress(ctx, scanID, externalUserID, PhaseProfiles, messagesScanned, profileIndex, totalMessages, profilesTotal)
}
}
_ = s.updateScanProgress(ctx, scanID, externalUserID, PhaseProfiles, messagesScanned, profilesTotal, totalMessages, profilesTotal)
enrichDone := 0
enrichTotal := len(enrichCandidates)
for _, c := range enrichCandidates {
if err := ctx.Err(); err != nil {
return err
}
email := c.email
agg := c.agg
profileID := profileIDs[email]
if profileID == "" {
enrichDone++
continue
}
progress := enrichDone + 1
if progress > enrichTotal {
progress = enrichTotal
}
_ = s.updateScanProgress(ctx, scanID, externalUserID, PhaseEnriching, messagesScanned, progress, totalMessages, enrichTotal)
s.fillSignaturesFromStoredMessages(ctx, email, agg)
sigs := agg.Signatures
if len(sigs) > 5 {
sigs = sigs[:5]
}
if len(sigs) == 0 {
_, _ = s.db.Exec(ctx, `
UPDATE contact_discovered_profiles SET enrichment_status = $2 WHERE id = $1::uuid
`, profileID, EnrichSkipped)
enrichDone++
continue
}
sigEntries := make([]SignatureEntry, 0, len(sigs))
for _, sig := range sigs {
sigEntries = append(sigEntries, SignatureEntry{
MessageID: sig.MessageID,
SignatureText: sig.SignatureText,
MessageDate: sig.MessageDate,
Confidence: sig.Confidence,
})
}
if enrichDone > 0 {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(llmEnrichDelay):
}
}
heartbeatCtx, heartbeatCancel := context.WithCancel(context.Background())
go s.enrichHeartbeat(heartbeatCtx, scanID, externalUserID, messagesScanned, enrichDone, totalMessages, enrichTotal)
enriched, enrichErr := enrichWithLLMTimeout(ctx, s.llm, llmSettings, email, agg.DisplayName, sigEntries, llmEnrichTimeout)
heartbeatCancel()
if enrichErr != nil {
s.logger.Warn("llm enrichment failed", "email", email, "error", enrichErr)
_, _ = s.db.Exec(ctx, `
UPDATE contact_discovered_profiles SET enrichment_status = 'failed' WHERE id = $1::uuid
`, profileID)
enrichDone++
continue
}
_ = s.applyEnrichmentResults(ctx, externalUserID, profileID, email, enriched, ncUserID, bookID, rejections)
enrichDone++
_ = s.updateScanProgress(ctx, scanID, externalUserID, PhaseEnriching, messagesScanned, enrichDone, totalMessages, enrichTotal)
}
_ = s.assignPersonGroups(ctx, externalUserID)
s.inferMissingCompanies(ctx, externalUserID, ncUserID, bookID)
_, err = s.db.Exec(ctx, `
UPDATE contact_discovery_scans
SET messages_scanned = $3, profiles_found = $4, profiles_total = $5,
total_messages = $6, phase = 'done', updated_at = NOW()
WHERE id = $1::uuid AND user_id = (SELECT id FROM users WHERE external_id = $2)
`, scanID, externalUserID, messagesScanned, profilesFound, profilesTotal, totalMessages)
return err
}
func llmSettingsHasProvider(settings llm.Settings) bool {
_, _, err := llm.ResolveProvider(settings, "")
return err == nil
}
type authInfo struct {
ListUnsubscribe string `json:"list_unsubscribe"`
MailedBy string `json:"mailed_by"`
}
func parseAuthInfo(raw []byte) authInfo {
var a authInfo
_ = json.Unmarshal(raw, &a)
return a
}
func (row messageRow) rowIsMailingList(auth authInfo) bool {
if strings.TrimSpace(auth.ListUnsubscribe) != "" {
return true
}
for _, addr := range parseAddresses(row.FromAddr) {
if isMailingListDomain(emailDomain(addr.Address)) {
return true
}
}
return false
}
func parseAddresses(raw []byte) []mailAddress {
if len(raw) == 0 {
return nil
}
var addrs []mailAddress
if err := json.Unmarshal(raw, &addrs); err != nil {
return nil
}
for i := range addrs {
addrs[i].Address = strings.ToLower(strings.TrimSpace(addrs[i].Address))
}
return addrs
}
func isSpamMessage(flags, labels []string) bool {
for _, f := range flags {
if strings.EqualFold(f, "spam") || strings.EqualFold(f, "junk") {
return true
}
}
for _, l := range labels {
ll := strings.ToLower(l)
if strings.Contains(ll, "spam") || strings.Contains(ll, "junk") || strings.Contains(ll, "indésirable") {
return true
}
}
return false
}
func isOwnAddress(email, accountEmail string) bool {
return strings.EqualFold(strings.TrimSpace(email), strings.TrimSpace(accountEmail))
}
func accountIsMessageRecipient(accountEmail string, to, cc, bcc []mailAddress) bool {
accountEmail = strings.ToLower(strings.TrimSpace(accountEmail))
if accountEmail == "" {
return false
}
for _, list := range [][]mailAddress{to, cc, bcc} {
for _, addr := range list {
if strings.ToLower(strings.TrimSpace(addr.Address)) == accountEmail {
return true
}
}
}
return false
}
func (s *Service) loadRejections(ctx context.Context, externalUserID string) (map[string]bool, error) {
out := map[string]bool{}
rows, err := s.db.Query(ctx, `
SELECT rejection_key FROM contact_discovery_rejections
WHERE user_id = (SELECT id FROM users WHERE external_id = $1)
`, externalUserID)
if err != nil {
return nil, err
}
defer rows.Close()
for rows.Next() {
var key string
if err := rows.Scan(&key); err != nil {
return nil, err
}
out[key] = true
}
return out, rows.Err()
}
func (s *Service) loadExistingContactEmails(ctx context.Context, externalUserID string) (map[string]bool, error) {
// Profiles already accepted are excluded; for now we rely on CardDAV sync on frontend
// Backend stores linked_contact_uid for accepted profiles
out := map[string]bool{}
rows, err := s.db.Query(ctx, `
SELECT lower(primary_email) FROM contact_discovered_profiles
WHERE user_id = (SELECT id FROM users WHERE external_id = $1) AND status = 'accepted'
`, externalUserID)
if err != nil {
return out, nil
}
defer rows.Close()
for rows.Next() {
var email string
if err := rows.Scan(&email); err != nil {
return out, nil
}
out[email] = true
}
return out, nil
}
func (s *Service) loadLLMSettings(ctx context.Context, externalUserID string) (llm.Settings, error) {
var raw []byte
err := s.db.QueryRow(ctx, `
SELECT COALESCE(s.preferences->'llm', '{}'::jsonb)
FROM users u
LEFT JOIN settings s ON s.user_id = u.id
WHERE u.external_id = $1
`, externalUserID).Scan(&raw)
if err != nil {
return llm.Settings{}, err
}
var settings llm.Settings
_ = json.Unmarshal(raw, &settings)
return settings, nil
}