- Refactored AI gateway to utilize new cost management structures for usage tracking. - Replaced deprecated token extraction methods with a unified cost parsing approach. - Enhanced usage fallback mechanisms and introduced detailed usage metrics in responses. - Added new metering functionality to record AI usage and costs effectively. - Updated tests to reflect changes in usage parsing and cost calculations. - Introduced new API endpoints for retrieving AI usage summaries and pricing information.
206 lines
6.4 KiB
Go
206 lines
6.4 KiB
Go
package discovery
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
|
|
"github.com/jackc/pgx/v5"
|
|
)
|
|
|
|
var (
|
|
ErrProfileNotFound = errors.New("profile not found")
|
|
ErrNoSignatures = errors.New("no signatures available for enrichment")
|
|
ErrAlreadyEnriched = errors.New("profile already enriched")
|
|
ErrLLMNotConfigured = errors.New("llm provider not configured")
|
|
ErrProfileNotSuggested = errors.New("profile is not available for enrichment")
|
|
)
|
|
|
|
type ProfileEnrichResponse struct {
|
|
ProfileID string `json:"profile_id"`
|
|
EnrichmentStatus EnrichmentStatus `json:"enrichment_status"`
|
|
}
|
|
|
|
func (s *Service) StartProfileEnrichment(ctx context.Context, externalUserID, profileID string) (ProfileEnrichResponse, error) {
|
|
sigs, err := s.loadProfileSignatures(ctx, profileID)
|
|
if err != nil {
|
|
return ProfileEnrichResponse{}, err
|
|
}
|
|
if len(sigs) == 0 {
|
|
return ProfileEnrichResponse{}, ErrNoSignatures
|
|
}
|
|
|
|
llmSettings, _ := s.loadLLMSettings(ctx, externalUserID)
|
|
if !llmSettingsHasProvider(llmSettings) {
|
|
return ProfileEnrichResponse{}, ErrLLMNotConfigured
|
|
}
|
|
|
|
tag, err := s.db.Exec(ctx, `
|
|
UPDATE contact_discovered_profiles p
|
|
SET enrichment_status = 'enriching', updated_at = NOW()
|
|
FROM users u
|
|
WHERE p.user_id = u.id
|
|
AND u.external_id = $1
|
|
AND p.id = $2::uuid
|
|
AND p.status = 'suggested'
|
|
AND p.enrichment_status NOT IN ('enriched', 'enriching')
|
|
`, externalUserID, profileID)
|
|
if err != nil {
|
|
return ProfileEnrichResponse{}, err
|
|
}
|
|
if tag.RowsAffected() == 0 {
|
|
var status ProfileStatus
|
|
var enrichStatus EnrichmentStatus
|
|
err := s.db.QueryRow(ctx, `
|
|
SELECT p.status, p.enrichment_status
|
|
FROM contact_discovered_profiles p
|
|
JOIN users u ON p.user_id = u.id
|
|
WHERE u.external_id = $1 AND p.id = $2::uuid
|
|
`, externalUserID, profileID).Scan(&status, &enrichStatus)
|
|
if err != nil {
|
|
if err == pgx.ErrNoRows {
|
|
return ProfileEnrichResponse{}, ErrProfileNotFound
|
|
}
|
|
return ProfileEnrichResponse{}, err
|
|
}
|
|
if status != ProfileSuggested {
|
|
return ProfileEnrichResponse{}, ErrProfileNotSuggested
|
|
}
|
|
if enrichStatus == EnrichEnriched {
|
|
return ProfileEnrichResponse{}, ErrAlreadyEnriched
|
|
}
|
|
if enrichStatus == EnrichEnriching {
|
|
return ProfileEnrichResponse{
|
|
ProfileID: profileID,
|
|
EnrichmentStatus: EnrichEnriching,
|
|
}, nil
|
|
}
|
|
return ProfileEnrichResponse{}, fmt.Errorf("profile not enrichable")
|
|
}
|
|
|
|
ncUserID, bookID := s.resolveDiscoveryNCContext(ctx, externalUserID)
|
|
go s.runProfileEnrichment(externalUserID, profileID, ncUserID, bookID)
|
|
|
|
return ProfileEnrichResponse{
|
|
ProfileID: profileID,
|
|
EnrichmentStatus: EnrichEnriching,
|
|
}, nil
|
|
}
|
|
|
|
func (s *Service) runProfileEnrichment(externalUserID, profileID, ncUserID, bookID string) {
|
|
ctx := context.Background()
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
s.logger.Error("profile enrichment panicked", "profile_id", profileID, "panic", r)
|
|
_, _ = s.db.Exec(ctx, `
|
|
UPDATE contact_discovered_profiles
|
|
SET enrichment_status = 'failed', updated_at = NOW()
|
|
WHERE id = $1::uuid AND enrichment_status = 'enriching'
|
|
`, profileID)
|
|
}
|
|
}()
|
|
|
|
profile, err := s.getProfileByID(ctx, externalUserID, profileID)
|
|
if err != nil {
|
|
s.logger.Warn("profile enrichment load profile failed", "profile_id", profileID, "error", err)
|
|
s.markProfileEnrichmentFailed(ctx, profileID)
|
|
return
|
|
}
|
|
|
|
sigs, err := s.loadProfileSignatures(ctx, profileID)
|
|
if err != nil || len(sigs) == 0 {
|
|
s.markProfileEnrichmentFailed(ctx, profileID)
|
|
return
|
|
}
|
|
|
|
llmSettings, err := s.loadLLMSettings(ctx, externalUserID)
|
|
if err != nil || !llmSettingsHasProvider(llmSettings) {
|
|
s.markProfileEnrichmentFailed(ctx, profileID)
|
|
return
|
|
}
|
|
|
|
enriched, enrichErr := enrichWithLLMTimeout(
|
|
ctx, s.db, externalUserID, s.llm, llmSettings,
|
|
profile.PrimaryEmail, profile.DisplayName, sigs, llmEnrichTimeout,
|
|
)
|
|
if enrichErr != nil {
|
|
s.logger.Warn("profile enrichment llm failed", "profile_id", profileID, "error", enrichErr)
|
|
s.markProfileEnrichmentFailed(ctx, profileID)
|
|
return
|
|
}
|
|
|
|
rejections, _ := s.loadRejections(ctx, externalUserID)
|
|
if err := s.applyEnrichmentResults(ctx, externalUserID, profileID, profile.PrimaryEmail, enriched, ncUserID, bookID, rejections); err != nil {
|
|
s.logger.Warn("profile enrichment persist failed", "profile_id", profileID, "error", err)
|
|
s.markProfileEnrichmentFailed(ctx, profileID)
|
|
return
|
|
}
|
|
|
|
s.inferMissingCompanies(ctx, externalUserID, ncUserID, bookID)
|
|
}
|
|
|
|
func (s *Service) markProfileEnrichmentFailed(ctx context.Context, profileID string) {
|
|
_, _ = s.db.Exec(ctx, `
|
|
UPDATE contact_discovered_profiles
|
|
SET enrichment_status = 'failed', updated_at = NOW()
|
|
WHERE id = $1::uuid AND enrichment_status = 'enriching'
|
|
`, profileID)
|
|
}
|
|
|
|
func (s *Service) applyEnrichmentResults(
|
|
ctx context.Context,
|
|
externalUserID, profileID, email string,
|
|
enriched *EnrichedContactData,
|
|
ncUserID, bookID string,
|
|
rejections map[string]bool,
|
|
) error {
|
|
enrichedJSON, _ := json.Marshal(enriched)
|
|
_, err := s.db.Exec(ctx, `
|
|
UPDATE contact_discovered_profiles
|
|
SET enrichment_status = 'enriched', enriched_data = $2::jsonb, enriched_at = NOW(), updated_at = NOW()
|
|
WHERE id = $1::uuid
|
|
`, profileID, string(enrichedJSON))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var existingContacts []ncContact
|
|
if ncUserID != "" {
|
|
existingContacts = s.loadNCContacts(ctx, ncUserID, bookID)
|
|
}
|
|
match := findExistingContact(existingContacts, email)
|
|
|
|
var suggestions []Suggestion
|
|
if match != nil {
|
|
suggestions = enrichExistingContactSuggestions(profileID, match.UID, enriched, match)
|
|
} else {
|
|
suggestions = enrichedDataToSuggestions(externalUserID, profileID, enriched)
|
|
}
|
|
|
|
for _, sug := range suggestions {
|
|
if match != nil && suggestionAlreadyOnContact(match, sug) {
|
|
continue
|
|
}
|
|
rejKey := fmt.Sprintf("field:%s:%s:%s", profileID, sug.FieldPath, sug.SuggestedValue)
|
|
if rejections[rejKey] {
|
|
continue
|
|
}
|
|
_, _ = s.db.Exec(ctx, `
|
|
INSERT INTO contact_enrichment_suggestions (
|
|
user_id, profile_id, suggestion_type, field_path, suggested_value, suggested_label, confidence, status
|
|
)
|
|
VALUES (
|
|
(SELECT id FROM users WHERE external_id = $1), $2::uuid, $3, $4, $5, $6, $7, 'pending'
|
|
)
|
|
ON CONFLICT (user_id, profile_id, field_path, suggested_value) DO UPDATE SET
|
|
status = CASE
|
|
WHEN contact_enrichment_suggestions.status = 'rejected' THEN 'rejected'
|
|
ELSE 'pending'
|
|
END,
|
|
confidence = EXCLUDED.confidence
|
|
`, externalUserID, profileID, sug.SuggestionType, sug.FieldPath, sug.SuggestedValue, sug.SuggestedLabel, sug.Confidence)
|
|
}
|
|
return nil
|
|
}
|