- Introduced new endpoints for contact discovery, including scanning, listing, and managing discovered contacts. - Implemented retry logic for handling missing DAV credentials during contact operations. - Added public share functionality for drive API, allowing users to manage public shares, including upload, delete, and rename operations. - Updated Nextcloud configuration to support public share links and improved error handling for public share permissions. - Enhanced logging and validation across contact and drive APIs for better error tracking and user feedback. - Added tests for new contact matching and ranking functionalities to ensure accuracy and reliability.
397 lines
11 KiB
Go
397 lines
11 KiB
Go
package discovery
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"sort"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/jackc/pgx/v5"
|
|
)
|
|
|
|
const (
|
|
scanBatchSize = 500
|
|
scanProgressEvery = 100
|
|
maxBodyChars = 12_000
|
|
maxSignatureMsgsPerEmail = 3
|
|
maxLLMEnrichPerScan = 25
|
|
minMessagesForLLM = 2
|
|
profileProgressEvery = 25
|
|
llmEnrichDelay = 1500 * time.Millisecond
|
|
llmEnrichTimeout = 45 * time.Second
|
|
staleScanAfter = 2 * time.Hour
|
|
)
|
|
|
|
var ErrScanAlreadyRunning = fmt.Errorf("contact discovery scan already running")
|
|
|
|
func (s *Service) StartScan(ctx context.Context, externalUserID, ncUserID, bookID string) (Scan, error) {
|
|
if s.db == nil {
|
|
return Scan{}, fmt.Errorf("database unavailable")
|
|
}
|
|
if bookID == "" {
|
|
bookID = "contacts"
|
|
}
|
|
|
|
if err := s.failStaleScans(ctx, externalUserID); err != nil {
|
|
return Scan{}, err
|
|
}
|
|
|
|
if active, err := s.findActiveScan(ctx, externalUserID); err != nil {
|
|
return Scan{}, err
|
|
} else if active != nil {
|
|
return *active, nil
|
|
}
|
|
|
|
var totalMessages int
|
|
_ = s.db.QueryRow(ctx, `
|
|
SELECT COUNT(*)::int
|
|
FROM messages m
|
|
JOIN mail_accounts ma ON m.account_id = ma.id
|
|
WHERE ma.user_id = (SELECT id FROM users WHERE external_id = $1)
|
|
`, externalUserID).Scan(&totalMessages)
|
|
|
|
var scanID string
|
|
err := s.db.QueryRow(ctx, `
|
|
INSERT INTO contact_discovery_scans (
|
|
user_id, status, phase, total_messages, book_id, nc_user_id
|
|
)
|
|
VALUES (
|
|
(SELECT id FROM users WHERE external_id = $1),
|
|
'running', 'scanning_messages', $2, $3, $4
|
|
)
|
|
RETURNING id::text
|
|
`, externalUserID, totalMessages, bookID, ncUserID).Scan(&scanID)
|
|
if err != nil {
|
|
return Scan{}, err
|
|
}
|
|
|
|
go s.runScan(externalUserID, ncUserID, bookID, scanID)
|
|
|
|
scan, err := s.GetScan(ctx, externalUserID, scanID)
|
|
if err != nil {
|
|
return Scan{
|
|
ID: scanID,
|
|
Status: ScanRunning,
|
|
Phase: PhaseScanning,
|
|
TotalMessages: totalMessages,
|
|
StartedAt: time.Now().UTC(),
|
|
UpdatedAt: time.Now().UTC(),
|
|
}, nil
|
|
}
|
|
return scan, nil
|
|
}
|
|
|
|
func (s *Service) CancelActiveScan(ctx context.Context, externalUserID string) error {
|
|
active, err := s.findActiveScan(ctx, externalUserID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if active != nil {
|
|
if cancel, ok := s.scanCancels.LoadAndDelete(active.ID); ok {
|
|
cancel.(context.CancelFunc)()
|
|
}
|
|
}
|
|
_, err = s.db.Exec(ctx, `
|
|
UPDATE contact_discovery_scans sc
|
|
SET status = 'failed',
|
|
phase = 'done',
|
|
error_message = 'Analyse annulée',
|
|
completed_at = NOW(),
|
|
updated_at = NOW()
|
|
FROM users u
|
|
WHERE sc.user_id = u.id
|
|
AND u.external_id = $1
|
|
AND sc.status IN ('pending', 'running')
|
|
`, externalUserID)
|
|
return err
|
|
}
|
|
|
|
func (s *Service) GetActiveScan(ctx context.Context, externalUserID string) (*Scan, error) {
|
|
if err := s.failStaleScans(ctx, externalUserID); err != nil {
|
|
return nil, err
|
|
}
|
|
return s.findActiveScan(ctx, externalUserID)
|
|
}
|
|
|
|
func (s *Service) findActiveScan(ctx context.Context, externalUserID string) (*Scan, error) {
|
|
var scanID string
|
|
err := s.db.QueryRow(ctx, `
|
|
SELECT sc.id::text
|
|
FROM contact_discovery_scans sc
|
|
JOIN users u ON sc.user_id = u.id
|
|
WHERE u.external_id = $1 AND sc.status IN ('pending', 'running')
|
|
ORDER BY sc.started_at DESC
|
|
LIMIT 1
|
|
`, externalUserID).Scan(&scanID)
|
|
if err == pgx.ErrNoRows {
|
|
return nil, nil
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
scan, err := s.GetScan(ctx, externalUserID, scanID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &scan, nil
|
|
}
|
|
|
|
func (s *Service) failStaleScans(ctx context.Context, externalUserID string) error {
|
|
_, err := s.db.Exec(ctx, `
|
|
UPDATE contact_discovery_scans sc
|
|
SET status = 'failed',
|
|
phase = 'done',
|
|
error_message = 'scan timed out (interrupted or stale)',
|
|
completed_at = NOW(),
|
|
updated_at = NOW()
|
|
FROM users u
|
|
WHERE sc.user_id = u.id
|
|
AND u.external_id = $1
|
|
AND sc.status IN ('pending', 'running')
|
|
AND sc.updated_at < NOW() - INTERVAL '30 minutes'
|
|
`, externalUserID)
|
|
return err
|
|
}
|
|
|
|
func (s *Service) updateScanProgress(ctx context.Context, scanID, externalUserID string, phase ScanPhase, messagesScanned, profilesDone, totalMessages, profilesTotal int) error {
|
|
_, err := s.db.Exec(ctx, `
|
|
UPDATE contact_discovery_scans sc
|
|
SET phase = $3,
|
|
messages_scanned = $4,
|
|
profiles_found = $5,
|
|
total_messages = CASE WHEN $6 > 0 THEN $6 ELSE sc.total_messages END,
|
|
profiles_total = CASE WHEN $7 > 0 THEN $7 ELSE sc.profiles_total END,
|
|
updated_at = NOW()
|
|
FROM users u
|
|
WHERE sc.id = $1::uuid AND sc.user_id = u.id AND u.external_id = $2
|
|
`, scanID, externalUserID, phase, messagesScanned, profilesDone, totalMessages, profilesTotal)
|
|
return err
|
|
}
|
|
|
|
func scanProgressPercent(phase ScanPhase, messagesScanned, totalMessages, profilesDone, profilesTotal int) float64 {
|
|
switch phase {
|
|
case PhaseScanning:
|
|
if totalMessages <= 0 {
|
|
return 5
|
|
}
|
|
return 5 + float64(messagesScanned)/float64(totalMessages)*60
|
|
case PhaseProfiles:
|
|
if profilesTotal <= 0 {
|
|
return 68
|
|
}
|
|
return 65 + float64(profilesDone)/float64(profilesTotal)*5
|
|
case PhaseEnriching:
|
|
if profilesTotal <= 0 {
|
|
return 75
|
|
}
|
|
return 70 + float64(profilesDone)/float64(profilesTotal)*28
|
|
case PhaseDone:
|
|
return 100
|
|
default:
|
|
return 0
|
|
}
|
|
}
|
|
|
|
func accountHitsToJSON(accounts map[string]*AccountDetection) ([]byte, error) {
|
|
if len(accounts) == 0 {
|
|
return []byte("[]"), nil
|
|
}
|
|
list := make([]AccountDetection, 0, len(accounts))
|
|
for _, a := range accounts {
|
|
list = append(list, *a)
|
|
}
|
|
sort.Slice(list, func(i, j int) bool {
|
|
if list[i].MessageCount == list[j].MessageCount {
|
|
return list[i].AccountEmail < list[j].AccountEmail
|
|
}
|
|
return list[i].MessageCount > list[j].MessageCount
|
|
})
|
|
return json.Marshal(list)
|
|
}
|
|
|
|
func (s *Service) recordAccountHit(agg *addressAgg, row messageRow) {
|
|
if agg.Accounts == nil {
|
|
agg.Accounts = map[string]*AccountDetection{}
|
|
}
|
|
hit, ok := agg.Accounts[row.AccountID]
|
|
if !ok {
|
|
hit = &AccountDetection{
|
|
AccountID: row.AccountID,
|
|
AccountEmail: row.AccountEmail,
|
|
AccountName: row.AccountName,
|
|
}
|
|
agg.Accounts[row.AccountID] = hit
|
|
}
|
|
hit.MessageCount++
|
|
}
|
|
|
|
func (s *Service) scanMessageBatch(ctx context.Context, externalUserID, afterID string, limit int) ([]messageRow, error) {
|
|
var rows []messageRow
|
|
query := `
|
|
SELECT m.id::text, m.subject, m.from_addr, m.to_addrs, m.cc_addrs, m.bcc_addrs,
|
|
m.reply_to, COALESCE(m.snippet, ''), ''::text, ''::text,
|
|
m.date, m.flags, m.labels, COALESCE(m.auth_info, '{}'::jsonb),
|
|
COALESCE(mf.folder_type, ''), ma.id::text,
|
|
COALESCE(ma.email, ''), COALESCE(ma.name, '')
|
|
FROM messages m
|
|
JOIN mail_accounts ma ON m.account_id = ma.id
|
|
LEFT JOIN mail_folders mf ON m.folder_id = mf.id
|
|
WHERE ma.user_id = (SELECT id FROM users WHERE external_id = $1)
|
|
`
|
|
args := []any{externalUserID}
|
|
if afterID != "" {
|
|
query += ` AND m.id > $2::uuid`
|
|
args = append(args, afterID)
|
|
}
|
|
query += ` ORDER BY m.id ASC LIMIT `
|
|
if afterID != "" {
|
|
query += `$3`
|
|
args = append(args, limit)
|
|
} else {
|
|
query += `$2`
|
|
args = append(args, limit)
|
|
}
|
|
|
|
dbRows, err := s.db.Query(ctx, query, args...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer dbRows.Close()
|
|
|
|
for dbRows.Next() {
|
|
var row messageRow
|
|
var authJSON []byte
|
|
if err := dbRows.Scan(
|
|
&row.ID, &row.Subject, &row.FromAddr, &row.ToAddrs,
|
|
&row.CcAddrs, &row.BccAddrs, &row.ReplyTo,
|
|
&row.Snippet, &row.BodyText, &row.BodyHTML, &row.Date, &row.Flags, &row.Labels,
|
|
&authJSON, &row.FolderType, &row.AccountID, &row.AccountEmail, &row.AccountName,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
row.AuthInfo = authJSON
|
|
if row.AccountName == "" {
|
|
row.AccountName = row.AccountEmail
|
|
}
|
|
rows = append(rows, row)
|
|
}
|
|
return rows, dbRows.Err()
|
|
}
|
|
|
|
func (s *Service) processMessageRow(row messageRow, aggs map[string]*addressAgg, rejections, existingEmails map[string]bool) {
|
|
isSpam := isSpamMessage(row.Flags, row.Labels)
|
|
isSent := row.FolderType == "sent" || strings.EqualFold(row.FolderType, "sent")
|
|
auth := parseAuthInfo(row.AuthInfo)
|
|
hasListUnsub := strings.TrimSpace(auth.ListUnsubscribe) != ""
|
|
|
|
fromAddrs := parseAddresses(row.FromAddr)
|
|
toAddrs := parseAddresses(row.ToAddrs)
|
|
ccAddrs := parseAddresses(row.CcAddrs)
|
|
bccAddrs := parseAddresses(row.BccAddrs)
|
|
replyAddrs := parseAddresses(row.ReplyTo)
|
|
|
|
type addrRole struct {
|
|
email string
|
|
name string
|
|
role string
|
|
}
|
|
var all []addrRole
|
|
for _, a := range fromAddrs {
|
|
all = append(all, addrRole{a.Address, a.Name, "from"})
|
|
}
|
|
for _, a := range toAddrs {
|
|
all = append(all, addrRole{a.Address, a.Name, "to"})
|
|
}
|
|
for _, a := range ccAddrs {
|
|
all = append(all, addrRole{a.Address, a.Name, "cc"})
|
|
}
|
|
for _, a := range bccAddrs {
|
|
all = append(all, addrRole{a.Address, a.Name, "bcc"})
|
|
}
|
|
for _, a := range replyAddrs {
|
|
all = append(all, addrRole{a.Address, a.Name, "reply_to"})
|
|
}
|
|
for _, email := range detectForwardedAddresses(row.Snippet, "") {
|
|
all = append(all, addrRole{email, "", "forwarded"})
|
|
}
|
|
|
|
userIsRecipient := accountIsMessageRecipient(row.AccountEmail, toAddrs, ccAddrs, bccAddrs)
|
|
|
|
for _, ar := range all {
|
|
email := strings.ToLower(strings.TrimSpace(ar.email))
|
|
if email == "" || isOwnAddress(email, row.AccountEmail) {
|
|
continue
|
|
}
|
|
if rejections["email:"+email] || existingEmails[email] || isNoReplyEmail(email) {
|
|
continue
|
|
}
|
|
|
|
agg, ok := aggs[email]
|
|
if !ok {
|
|
agg = &addressAgg{Email: email, Roles: map[string]struct{}{}, Accounts: map[string]*AccountDetection{}}
|
|
aggs[email] = agg
|
|
}
|
|
agg.MessageCount++
|
|
agg.Roles[ar.role] = struct{}{}
|
|
s.recordAccountHit(agg, row)
|
|
if ar.name != "" && agg.DisplayName == "" {
|
|
agg.DisplayName = ar.name
|
|
}
|
|
if isSent {
|
|
agg.SentCount++
|
|
if ar.role == "to" || ar.role == "cc" || ar.role == "bcc" {
|
|
agg.OutboundCount++
|
|
}
|
|
} else {
|
|
agg.ReceivedCount++
|
|
if ar.role == "from" || ar.role == "cc" {
|
|
agg.InboundFromCCCount++
|
|
}
|
|
if userIsRecipient && (ar.role == "cc" || ar.role == "bcc") {
|
|
agg.CopresenceCCBCCCount++
|
|
}
|
|
}
|
|
if isSpam {
|
|
agg.SpamCount++
|
|
}
|
|
if ar.role == "forwarded" {
|
|
agg.ForwardedCount++
|
|
}
|
|
if hasListUnsub {
|
|
agg.ListUnsub++
|
|
}
|
|
if row.rowIsMailingList(auth) {
|
|
agg.MailingList++
|
|
}
|
|
if row.Date.After(agg.LastSeen) {
|
|
agg.LastSeen = row.Date
|
|
}
|
|
if ar.role == "from" {
|
|
agg.trackFromMessage(row.ID, row.Date)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (agg *addressAgg) trackFromMessage(id string, date time.Time) {
|
|
if id == "" {
|
|
return
|
|
}
|
|
for i, ref := range agg.fromMessageRefs {
|
|
if ref.id == id {
|
|
if date.After(ref.date) {
|
|
agg.fromMessageRefs[i].date = date
|
|
}
|
|
return
|
|
}
|
|
}
|
|
agg.fromMessageRefs = append(agg.fromMessageRefs, fromMessageRef{id: id, date: date})
|
|
sort.Slice(agg.fromMessageRefs, func(i, j int) bool {
|
|
return agg.fromMessageRefs[i].date.After(agg.fromMessageRefs[j].date)
|
|
})
|
|
if len(agg.fromMessageRefs) > maxSignatureMsgsPerEmail {
|
|
agg.fromMessageRefs = agg.fromMessageRefs[:maxSignatureMsgsPerEmail]
|
|
}
|
|
}
|