ultisuite-backend/internal/api/meet/transcript_processor.go
R3D347HR4Y 3978622050
Some checks are pending
CI / Go tests (push) Waiting to run
CI / Integration tests (push) Waiting to run
CI / DB migrations (push) Waiting to run
refactor(ai): update AI gateway and cost management features
- Refactored AI gateway to utilize new cost management structures for usage tracking.
- Replaced deprecated token extraction methods with a unified cost parsing approach.
- Enhanced usage fallback mechanisms and introduced detailed usage metrics in responses.
- Added new metering functionality to record AI usage and costs effectively.
- Updated tests to reflect changes in usage parsing and cost calculations.
- Introduced new API endpoints for retrieving AI usage summaries and pricing information.
2026-06-16 10:46:33 +02:00

344 lines
9.6 KiB
Go

package meet
import (
"bytes"
"context"
"encoding/json"
"fmt"
"log/slog"
"net/smtp"
"path"
"strings"
"time"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/ultisuite/ulti-backend/internal/ai"
"github.com/ultisuite/ulti-backend/internal/llm"
"github.com/ultisuite/ulti-backend/internal/nextcloud"
"github.com/ultisuite/ulti-backend/internal/orgpolicy"
)
type TranscriptProcessor struct {
db *pgxpool.Pool
nc *nextcloud.Client
policy *orgpolicy.Loader
llm *llm.Client
logger *slog.Logger
}
func NewTranscriptProcessor(db *pgxpool.Pool, nc *nextcloud.Client, policy *orgpolicy.Loader) *TranscriptProcessor {
return &TranscriptProcessor{
db: db,
nc: nc,
policy: policy,
llm: llm.NewClient(),
logger: slog.Default().With("component", "meet-transcript"),
}
}
type transcriptJobInput struct {
RoomID string
OrganizerUserID string
OrganizerEmail string
ParticipantEmails []string
RawTranscript string
Mode string
QueuedAudioURL string
}
func (p *TranscriptProcessor) Handle(ctx context.Context, in transcriptJobInput) error {
policy, err := p.policy.MeetPolicy(ctx)
if err != nil {
return err
}
if !policy.TranscriptionEnabled {
return fmt.Errorf("transcription disabled")
}
mode := strings.TrimSpace(in.Mode)
if mode == "" {
mode = policy.TranscriptionMode
}
status := "completed"
body := strings.TrimSpace(in.RawTranscript)
if mode == "queued" && body == "" && strings.TrimSpace(in.QueuedAudioURL) != "" {
status = "queued"
body = ""
}
if body == "" && status != "queued" {
return fmt.Errorf("empty transcript")
}
participantsJSON, _ := json.Marshal(in.ParticipantEmails)
metadataJSON, _ := json.Marshal(map[string]any{"queued_audio_url": in.QueuedAudioURL})
var jobID string
err = p.db.QueryRow(ctx, `
INSERT INTO meet_transcript_jobs (
room_id, organizer_user_id, organizer_email, mode, status,
raw_transcript, participant_emails, metadata
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8::jsonb)
RETURNING id::text
`, in.RoomID, nullIfEmpty(in.OrganizerUserID), nullIfEmpty(in.OrganizerEmail), mode, status,
body, participantsJSON, metadataJSON).Scan(&jobID)
if err != nil {
return fmt.Errorf("insert transcript job: %w", err)
}
if status == "queued" {
p.logger.Info("transcript queued for async processing", "job_id", jobID, "room", in.RoomID)
return nil
}
return p.runPostActions(ctx, jobID, policy, in, body)
}
func (p *TranscriptProcessor) runPostActions(
ctx context.Context,
jobID string,
policy orgpolicy.MeetPolicy,
in transcriptJobInput,
rawTranscript string,
) error {
finalText := rawTranscript
actions := policy.PostActions
if actions.LLMEnabled {
summary, provider, model, usage, err := p.summarize(ctx, policy, rawTranscript)
if err != nil {
p.logger.Warn("llm summary failed", "error", err, "job_id", jobID)
} else if strings.TrimSpace(summary) != "" {
finalText = summary
if extID, err := ai.ResolveExternalIDByEmail(ctx, p.db, in.OrganizerEmail); err == nil && extID != "" {
ai.RecordFeatureUsage(ctx, p.db, extID, "ultimeet", model, provider, usage)
}
}
}
if actions.DriveEnabled && p.nc != nil && strings.TrimSpace(in.OrganizerUserID) != "" {
if err := p.saveToDrive(ctx, in.OrganizerUserID, actions.DriveFolderPath, in.RoomID, finalText); err != nil {
p.logger.Warn("drive save failed", "error", err, "job_id", jobID)
}
}
if actions.LLMEnabled && actions.LLMThenDrive && p.nc != nil && strings.TrimSpace(in.OrganizerUserID) != "" {
if err := p.saveToDrive(ctx, in.OrganizerUserID, actions.DriveFolderPath, in.RoomID+"-raw", rawTranscript); err != nil {
p.logger.Warn("drive raw save failed", "error", err, "job_id", jobID)
}
}
emailBody := finalText
sendEmail := actions.EmailEnabled
if actions.LLMEnabled && actions.LLMThenEmail {
sendEmail = true
}
if sendEmail {
recipients := p.resolveRecipients(actions, in)
if len(recipients) > 0 {
if err := p.sendOrgEmail(ctx, recipients, "Transcription UltiMeet — "+in.RoomID, emailBody); err != nil {
p.logger.Warn("transcript email failed", "error", err, "job_id", jobID)
}
}
}
_, err := p.db.Exec(ctx, `
UPDATE meet_transcript_jobs
SET processed_transcript = $2, status = 'completed', updated_at = NOW()
WHERE id = $3::uuid
`, jobID, finalText, jobID)
return err
}
func (p *TranscriptProcessor) summarize(ctx context.Context, policy orgpolicy.MeetPolicy, transcript string) (string, llm.Provider, string, llm.UsageDetail, error) {
provider, model, err := p.resolveLLM(ctx, policy.PostActions.LLMProviderID)
if err != nil {
return "", llm.Provider{}, "", llm.UsageDetail{}, err
}
prompt := strings.TrimSpace(policy.PostActions.LLMPrompt)
if prompt == "" {
prompt = "Résume cette réunion en français."
}
result, err := p.llm.CompleteWithUsage(ctx, provider, model, prompt, transcript)
if err != nil {
return "", provider, model, llm.UsageDetail{}, err
}
return result.Content, provider, result.Model, result.Usage, nil
}
func (p *TranscriptProcessor) resolveLLM(ctx context.Context, providerID string) (llm.Provider, string, error) {
var raw []byte
err := p.db.QueryRow(ctx, `SELECT settings FROM org_settings WHERE id = 1`).Scan(&raw)
if err != nil {
return llm.Provider{}, "", err
}
stored := map[string]any{}
if err := json.Unmarshal(raw, &stored); err != nil {
return llm.Provider{}, "", err
}
llmSection, _ := stored["llm"].(map[string]any)
providersRaw, _ := llmSection["providers"].([]any)
defaultID, _ := llmSection["default_provider_id"].(string)
targetID := strings.TrimSpace(providerID)
if targetID == "" {
targetID = defaultID
}
for _, item := range providersRaw {
pm, ok := item.(map[string]any)
if !ok {
continue
}
id, _ := pm["id"].(string)
if id != targetID {
continue
}
return llm.Provider{
ID: id,
BaseURL: stringValue(pm["base_url"]),
APIKey: stringValue(pm["api_key"]),
DefaultModel: stringValue(pm["default_model"]),
}, stringValue(pm["default_model"]), nil
}
return llm.Provider{}, "", fmt.Errorf("llm provider not found")
}
func (p *TranscriptProcessor) saveToDrive(ctx context.Context, userID, folderPath, roomID, content string) error {
folder := strings.TrimSpace(folderPath)
if folder == "" {
folder = "/UltiMeet/Transcripts"
}
if !strings.HasPrefix(folder, "/") {
folder = "/" + folder
}
fileName := fmt.Sprintf("%s-%s.txt", sanitizeFileName(roomID), time.Now().UTC().Format("20060102-150405"))
davPath := path.Join(folder, fileName)
return p.nc.Upload(ctx, userID, davPath, strings.NewReader(content), "text/plain; charset=utf-8")
}
func (p *TranscriptProcessor) resolveRecipients(actions orgpolicy.MeetPostActions, in transcriptJobInput) []string {
out := make([]string, 0, 8)
seen := map[string]struct{}{}
add := func(email string) {
e := strings.ToLower(strings.TrimSpace(email))
if e == "" {
return
}
if _, ok := seen[e]; ok {
return
}
seen[e] = struct{}{}
out = append(out, e)
}
switch actions.EmailRecipients {
case "participants":
for _, e := range in.ParticipantEmails {
add(e)
}
case "both":
add(in.OrganizerEmail)
for _, e := range in.ParticipantEmails {
add(e)
}
case "custom":
for part := range strings.SplitSeq(actions.EmailCustomAddresses, ",") {
add(part)
}
default:
add(in.OrganizerEmail)
}
return out
}
func (p *TranscriptProcessor) sendOrgEmail(ctx context.Context, to []string, subject, body string) error {
var raw []byte
if err := p.db.QueryRow(ctx, `SELECT settings FROM org_settings WHERE id = 1`).Scan(&raw); err != nil {
return err
}
stored := map[string]any{}
if err := json.Unmarshal(raw, &stored); err != nil {
return err
}
mailing, _ := stored["mailing"].(map[string]any)
if mailing == nil || !boolValue(mailing["enabled"]) {
return fmt.Errorf("org mailing disabled")
}
host := stringValue(mailing["smtp_host"])
port := intValue(mailing["smtp_port"], 587)
user := stringValue(mailing["smtp_user"])
pass := stringValue(mailing["smtp_password"])
from := stringValue(mailing["from_email"])
fromName := stringValue(mailing["from_name"])
if from == "" {
return fmt.Errorf("mailing from_email missing")
}
addr := fmt.Sprintf("%s:%d", host, port)
msg := buildPlainEmail(from, fromName, to, subject, body)
auth := smtp.PlainAuth("", user, pass, host)
tlsMode := stringValue(mailing["tls_mode"])
if tlsMode == "none" {
return smtp.SendMail(addr, nil, from, to, msg)
}
return smtp.SendMail(addr, auth, from, to, msg)
}
func buildPlainEmail(from, fromName string, to []string, subject, body string) []byte {
fromHeader := from
if strings.TrimSpace(fromName) != "" {
fromHeader = fmt.Sprintf("%s <%s>", fromName, from)
}
var buf bytes.Buffer
buf.WriteString("From: " + fromHeader + "\r\n")
buf.WriteString("To: " + strings.Join(to, ", ") + "\r\n")
buf.WriteString("Subject: " + subject + "\r\n")
buf.WriteString("MIME-Version: 1.0\r\n")
buf.WriteString("Content-Type: text/plain; charset=UTF-8\r\n")
buf.WriteString("\r\n")
buf.WriteString(body)
return buf.Bytes()
}
func sanitizeFileName(s string) string {
s = strings.Map(func(r rune) rune {
switch {
case r >= 'a' && r <= 'z', r >= 'A' && r <= 'Z', r >= '0' && r <= '9', r == '-', r == '_':
return r
default:
return '-'
}
}, s)
if s == "" {
return "room"
}
return s
}
func nullIfEmpty(s string) any {
if strings.TrimSpace(s) == "" {
return nil
}
return s
}
func boolValue(v any) bool {
b, _ := v.(bool)
return b
}
func stringValue(v any) string {
s, _ := v.(string)
return s
}
func intValue(v any, fallback int) int {
switch t := v.(type) {
case float64:
if t > 0 {
return int(t)
}
case int:
if t > 0 {
return t
}
}
return fallback
}