- Added a new endpoint for simulating rules based on sample messages, allowing users to test rule conditions and actions. - Enhanced webhook management with versioning, preview capabilities, and improved validation for webhook requests. - Updated service interfaces to support new functionalities, including max retries for webhooks and signing secrets. - Implemented observability metrics for webhook retries and dead-letter tracking, improving error handling and monitoring. - Enhanced unit tests to cover new simulation and webhook features, ensuring robust functionality and validation.
684 lines
20 KiB
Go
684 lines
20 KiB
Go
package mail
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"log/slog"
|
|
|
|
"github.com/jackc/pgx/v5"
|
|
"github.com/jackc/pgx/v5/pgxpool"
|
|
|
|
"github.com/ultisuite/ulti-backend/internal/api/query"
|
|
"github.com/ultisuite/ulti-backend/internal/mail/credentials"
|
|
"github.com/ultisuite/ulti-backend/internal/mail/sanitize"
|
|
"github.com/ultisuite/ulti-backend/internal/mail/storage"
|
|
"github.com/ultisuite/ulti-backend/internal/mail/threading"
|
|
"github.com/ultisuite/ulti-backend/internal/securityaudit"
|
|
)
|
|
|
|
var (
|
|
ErrNotFound = errors.New("not found")
|
|
ErrUserNotProvisioned = errors.New("user not provisioned")
|
|
ErrAccountNotFound = errors.New("account not found")
|
|
ErrCredentialsUnavailable = errors.New("credentials encryption unavailable")
|
|
)
|
|
|
|
type Service struct {
|
|
db *pgxpool.Pool
|
|
credentials *credentials.Manager
|
|
audit *securityaudit.Logger
|
|
storage *storage.Client
|
|
attachmentsBucket string
|
|
logger *slog.Logger
|
|
}
|
|
|
|
func NewService(db *pgxpool.Pool, audit *securityaudit.Logger, credentialManager *credentials.Manager, objectStorage *storage.Client, attachmentsBucket string) *Service {
|
|
return &Service{
|
|
db: db,
|
|
credentials: credentialManager,
|
|
audit: audit,
|
|
storage: objectStorage,
|
|
attachmentsBucket: attachmentsBucket,
|
|
logger: slog.Default().With("component", "mail-service"),
|
|
}
|
|
}
|
|
|
|
func (s *Service) ResolveUserID(ctx context.Context, externalID string) (string, error) {
|
|
var userID string
|
|
err := s.db.QueryRow(ctx, `SELECT id FROM users WHERE external_id = $1`, externalID).Scan(&userID)
|
|
if err != nil {
|
|
if errors.Is(err, pgx.ErrNoRows) {
|
|
return "", ErrUserNotProvisioned
|
|
}
|
|
return "", err
|
|
}
|
|
return userID, nil
|
|
}
|
|
|
|
type AccountsList struct {
|
|
Accounts []map[string]any `json:"accounts"`
|
|
Pagination query.PaginationMeta `json:"pagination,omitempty"`
|
|
}
|
|
|
|
func (s *Service) ListAccounts(ctx context.Context, externalID string, params query.ListParams) (AccountsList, error) {
|
|
var total int64
|
|
err := s.db.QueryRow(ctx, `
|
|
SELECT COUNT(*) FROM mail_accounts
|
|
WHERE user_id = (SELECT id FROM users WHERE external_id = $1)
|
|
`, externalID).Scan(&total)
|
|
if err != nil {
|
|
return AccountsList{}, err
|
|
}
|
|
|
|
rows, err := s.db.Query(ctx, `
|
|
SELECT id, name, email, provider, imap_host, smtp_host, is_active, last_sync_at, created_at
|
|
FROM mail_accounts WHERE user_id = (SELECT id FROM users WHERE external_id = $1)
|
|
ORDER BY created_at ASC
|
|
LIMIT $2 OFFSET $3
|
|
`, externalID, params.Limit(), params.Offset())
|
|
if err != nil {
|
|
return AccountsList{}, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
accounts := make([]map[string]any, 0)
|
|
for rows.Next() {
|
|
var id, name, email, provider, imapHost, smtpHost string
|
|
var isActive bool
|
|
var lastSync, createdAt any
|
|
if err := rows.Scan(&id, &name, &email, &provider, &imapHost, &smtpHost, &isActive, &lastSync, &createdAt); err != nil {
|
|
return AccountsList{}, err
|
|
}
|
|
accounts = append(accounts, map[string]any{
|
|
"id": id, "name": name, "email": email, "provider": provider,
|
|
"imap_host": imapHost, "smtp_host": smtpHost, "is_active": isActive,
|
|
"last_sync_at": lastSync, "created_at": createdAt,
|
|
})
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return AccountsList{}, err
|
|
}
|
|
|
|
return AccountsList{
|
|
Accounts: accounts,
|
|
Pagination: params.Meta(&total),
|
|
}, nil
|
|
}
|
|
|
|
func (s *Service) CreateAccount(ctx context.Context, externalID string, req *createAccountRequest) (string, error) {
|
|
if s.credentials == nil {
|
|
return "", ErrCredentialsUnavailable
|
|
}
|
|
creds, err := s.credentials.Encrypt(req.Username, req.Password)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
var id string
|
|
err = s.db.QueryRow(ctx, `
|
|
INSERT INTO mail_accounts (user_id, name, email, provider, imap_host, imap_port, imap_tls, smtp_host, smtp_port, smtp_tls, credentials)
|
|
VALUES ((SELECT id FROM users WHERE external_id = $1), $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
|
|
RETURNING id
|
|
`, externalID, req.Name, req.Email, req.Provider, req.IMAPHost, req.IMAPPort, req.IMAPTLS, req.SMTPHost, req.SMTPPort, req.SMTPTLS, creds).Scan(&id)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
return id, nil
|
|
}
|
|
|
|
func (s *Service) GetAccount(ctx context.Context, externalID, accountID string) (map[string]any, error) {
|
|
var id, name, email, provider string
|
|
err := s.db.QueryRow(ctx, `
|
|
SELECT id, name, email, provider FROM mail_accounts
|
|
WHERE id = $1 AND user_id = (SELECT id FROM users WHERE external_id = $2)
|
|
`, accountID, externalID).Scan(&id, &name, &email, &provider)
|
|
if err != nil {
|
|
if errors.Is(err, pgx.ErrNoRows) {
|
|
return nil, ErrNotFound
|
|
}
|
|
return nil, err
|
|
}
|
|
return map[string]any{"id": id, "name": name, "email": email, "provider": provider}, nil
|
|
}
|
|
|
|
func (s *Service) DeleteAccount(ctx context.Context, externalID, accountID string) error {
|
|
userID, err := s.ResolveUserID(ctx, externalID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
result, err := s.db.Exec(ctx, `DELETE FROM mail_accounts WHERE id = $1 AND user_id = $2`, accountID, userID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if result.RowsAffected() == 0 {
|
|
return ErrNotFound
|
|
}
|
|
if s.audit != nil {
|
|
s.audit.Log(ctx, externalID, securityaudit.ActionCriticalDeletion, map[string]any{
|
|
"target": "mail_account", "account_id": accountID,
|
|
})
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type MessageListFilter struct {
|
|
Folder string
|
|
AccountID string
|
|
}
|
|
|
|
type MessagesList struct {
|
|
Messages []map[string]any `json:"messages"`
|
|
Page int `json:"page"`
|
|
Pagination query.PaginationMeta `json:"pagination,omitempty"`
|
|
}
|
|
|
|
func (s *Service) ListMessages(ctx context.Context, externalID string, filter MessageListFilter, params query.ListParams) (MessagesList, error) {
|
|
baseQuery := `
|
|
FROM messages m
|
|
JOIN mail_accounts ma ON m.account_id = ma.id
|
|
WHERE ma.user_id = (SELECT id FROM users WHERE external_id = $1)
|
|
`
|
|
args := []any{externalID}
|
|
argIdx := 2
|
|
|
|
if filter.AccountID != "" {
|
|
baseQuery += fmt.Sprintf(" AND m.account_id = $%d", argIdx)
|
|
args = append(args, filter.AccountID)
|
|
argIdx++
|
|
}
|
|
if filter.Folder != "" {
|
|
baseQuery += fmt.Sprintf(" AND m.folder_id = (SELECT id FROM mail_folders WHERE name = $%d AND account_id = m.account_id LIMIT 1)", argIdx)
|
|
args = append(args, filter.Folder)
|
|
argIdx++
|
|
}
|
|
|
|
var total int64
|
|
countQuery := "SELECT COUNT(*) " + baseQuery
|
|
if err := s.db.QueryRow(ctx, countQuery, args...).Scan(&total); err != nil {
|
|
return MessagesList{}, err
|
|
}
|
|
|
|
listQuery := `
|
|
SELECT m.id, m.message_id, m.thread_id, m.subject, m.from_addr, m.to_addrs, m.date, m.snippet, m.flags, m.labels, m.has_attachments
|
|
` + baseQuery + fmt.Sprintf(" ORDER BY m.date DESC LIMIT $%d OFFSET $%d", argIdx, argIdx+1)
|
|
args = append(args, params.Limit(), params.Offset())
|
|
|
|
rows, err := s.db.Query(ctx, listQuery, args...)
|
|
if err != nil {
|
|
return MessagesList{}, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
messages := make([]map[string]any, 0)
|
|
for rows.Next() {
|
|
var id, messageID, subject, snippet string
|
|
var threadID *string
|
|
var fromAddr, toAddrs []byte
|
|
var date any
|
|
var flags, labels []string
|
|
var hasAttachments bool
|
|
if err := rows.Scan(&id, &messageID, &threadID, &subject, &fromAddr, &toAddrs, &date, &snippet, &flags, &labels, &hasAttachments); err != nil {
|
|
return MessagesList{}, err
|
|
}
|
|
entry := map[string]any{
|
|
"id": id, "message_id": messageID, "subject": subject, "from": json.RawMessage(fromAddr),
|
|
"to": json.RawMessage(toAddrs), "date": date, "snippet": snippet,
|
|
"flags": flags, "labels": labels, "has_attachments": hasAttachments,
|
|
}
|
|
if threadID != nil {
|
|
entry["thread_id"] = *threadID
|
|
}
|
|
messages = append(messages, entry)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return MessagesList{}, err
|
|
}
|
|
|
|
return MessagesList{
|
|
Messages: messages,
|
|
Page: params.Page,
|
|
Pagination: params.Meta(&total),
|
|
}, nil
|
|
}
|
|
|
|
func (s *Service) GetMessage(ctx context.Context, externalID, messageID string) (map[string]any, error) {
|
|
var msg struct {
|
|
ID string
|
|
MessageID string
|
|
ThreadID *string
|
|
InReplyTo string
|
|
References []string
|
|
Subject string
|
|
From []byte
|
|
To []byte
|
|
Cc []byte
|
|
Date any
|
|
Text string
|
|
HTML string
|
|
Flags []string
|
|
Labels []string
|
|
}
|
|
err := s.db.QueryRow(ctx, `
|
|
SELECT m.id, m.message_id, m.thread_id, m.in_reply_to, m.references_header,
|
|
m.subject, m.from_addr, m.to_addrs, m.cc_addrs, m.date,
|
|
m.body_text, m.body_html, m.flags, m.labels
|
|
FROM messages m JOIN mail_accounts ma ON m.account_id = ma.id
|
|
WHERE m.id = $1 AND ma.user_id = (SELECT id FROM users WHERE external_id = $2)
|
|
`, messageID, externalID).Scan(
|
|
&msg.ID, &msg.MessageID, &msg.ThreadID, &msg.InReplyTo, &msg.References,
|
|
&msg.Subject, &msg.From, &msg.To, &msg.Cc, &msg.Date,
|
|
&msg.Text, &msg.HTML, &msg.Flags, &msg.Labels,
|
|
)
|
|
if err != nil {
|
|
if errors.Is(err, pgx.ErrNoRows) {
|
|
return nil, ErrNotFound
|
|
}
|
|
return nil, err
|
|
}
|
|
out := map[string]any{
|
|
"id": msg.ID, "message_id": msg.MessageID, "subject": msg.Subject,
|
|
"from": json.RawMessage(msg.From), "to": json.RawMessage(msg.To), "cc": json.RawMessage(msg.Cc),
|
|
"date": msg.Date, "body_text": msg.Text, "body_html": sanitize.SanitizeHTML(msg.HTML),
|
|
"flags": msg.Flags, "labels": msg.Labels,
|
|
"in_reply_to": msg.InReplyTo, "references": msg.References,
|
|
}
|
|
if msg.ThreadID != nil {
|
|
out["thread_id"] = *msg.ThreadID
|
|
}
|
|
return out, nil
|
|
}
|
|
|
|
func (s *Service) UpdateLabels(ctx context.Context, externalID, messageID string, labels []string) error {
|
|
result, err := s.db.Exec(ctx, `
|
|
UPDATE messages m
|
|
SET labels = $1, updated_at = NOW()
|
|
FROM mail_accounts ma
|
|
WHERE m.id = $2
|
|
AND m.account_id = ma.id
|
|
AND ma.user_id = (SELECT id FROM users WHERE external_id = $3)
|
|
`, labels, messageID, externalID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if result.RowsAffected() == 0 {
|
|
return ErrNotFound
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *Service) UpdateFlags(ctx context.Context, externalID, messageID string, flags []string) error {
|
|
result, err := s.db.Exec(ctx, `
|
|
UPDATE messages m
|
|
SET flags = $1, updated_at = NOW()
|
|
FROM mail_accounts ma
|
|
WHERE m.id = $2
|
|
AND m.account_id = ma.id
|
|
AND ma.user_id = (SELECT id FROM users WHERE external_id = $3)
|
|
`, flags, messageID, externalID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if result.RowsAffected() == 0 {
|
|
return ErrNotFound
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *Service) DeleteMessage(ctx context.Context, externalID, messageID string) error {
|
|
result, err := s.db.Exec(ctx, `
|
|
DELETE FROM messages m
|
|
USING mail_accounts ma
|
|
WHERE m.id = $1
|
|
AND m.account_id = ma.id
|
|
AND ma.user_id = (SELECT id FROM users WHERE external_id = $2)
|
|
`, messageID, externalID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if result.RowsAffected() == 0 {
|
|
return ErrNotFound
|
|
}
|
|
if s.audit != nil {
|
|
s.audit.Log(ctx, externalID, securityaudit.ActionCriticalDeletion, map[string]any{
|
|
"target": "message", "message_id": messageID,
|
|
})
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *Service) GetThread(ctx context.Context, externalID, threadID string) (map[string]any, error) {
|
|
rows, err := s.db.Query(ctx, `
|
|
SELECT m.id, m.subject, m.from_addr, m.date, m.snippet, m.flags
|
|
FROM messages m JOIN mail_accounts ma ON m.account_id = ma.id
|
|
WHERE m.thread_id = $1 AND ma.user_id = (SELECT id FROM users WHERE external_id = $2)
|
|
ORDER BY m.date ASC
|
|
`, threadID, externalID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
messages := make([]map[string]any, 0)
|
|
for rows.Next() {
|
|
var id, subject, snippet string
|
|
var from []byte
|
|
var date any
|
|
var flags []string
|
|
if err := rows.Scan(&id, &subject, &from, &date, &snippet, &flags); err != nil {
|
|
return nil, err
|
|
}
|
|
messages = append(messages, map[string]any{
|
|
"id": id, "subject": subject, "from": json.RawMessage(from),
|
|
"date": date, "snippet": snippet, "flags": flags,
|
|
})
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return map[string]any{"thread_id": threadID, "messages": messages}, nil
|
|
}
|
|
|
|
type replyParent struct {
|
|
MessageID string
|
|
References []string
|
|
}
|
|
|
|
func (s *Service) loadReplyParent(ctx context.Context, userID, replyToMessageID string) (*replyParent, error) {
|
|
var parent replyParent
|
|
err := s.db.QueryRow(ctx, `
|
|
SELECT m.message_id, m.references_header
|
|
FROM messages m
|
|
JOIN mail_accounts ma ON m.account_id = ma.id
|
|
WHERE m.id = $1 AND ma.user_id = $2
|
|
`, replyToMessageID, userID).Scan(&parent.MessageID, &parent.References)
|
|
if err != nil {
|
|
if errors.Is(err, pgx.ErrNoRows) {
|
|
return nil, ErrNotFound
|
|
}
|
|
return nil, err
|
|
}
|
|
return &parent, nil
|
|
}
|
|
|
|
func (s *Service) SendMessage(ctx context.Context, userID string, req *sendMessageRequest) (id, status string, err error) {
|
|
if req.IdempotencyKey != "" {
|
|
err = s.db.QueryRow(ctx, `
|
|
SELECT id, status FROM outbox
|
|
WHERE user_id = $1 AND idempotency_key = $2
|
|
AND created_at > NOW() - INTERVAL '24 hours'
|
|
ORDER BY created_at DESC
|
|
LIMIT 1
|
|
`, userID, req.IdempotencyKey).Scan(&id, &status)
|
|
if err == nil {
|
|
return id, status, nil
|
|
}
|
|
if !errors.Is(err, pgx.ErrNoRows) {
|
|
return "", "", err
|
|
}
|
|
}
|
|
|
|
toJSON, _ := json.Marshal(req.To)
|
|
ccJSON, _ := json.Marshal(req.Cc)
|
|
bccJSON, _ := json.Marshal(req.Bcc)
|
|
|
|
inReplyTo := threading.NormalizeMessageID(req.InReplyTo)
|
|
var references []string
|
|
|
|
if req.ReplyToMessageID != "" {
|
|
parent, err := s.loadReplyParent(ctx, userID, req.ReplyToMessageID)
|
|
if err != nil {
|
|
return "", "", err
|
|
}
|
|
inReplyTo = threading.NormalizeMessageID(parent.MessageID)
|
|
references = threading.BuildReferences(parent.MessageID, parent.References)
|
|
}
|
|
|
|
status = "queued"
|
|
if req.ScheduleAt != nil {
|
|
status = "scheduled"
|
|
}
|
|
|
|
err = s.db.QueryRow(ctx, `
|
|
INSERT INTO outbox (
|
|
user_id, account_id, to_addrs, cc_addrs, bcc_addrs, subject,
|
|
body_text, body_html, in_reply_to, references_header, status, scheduled_at, idempotency_key
|
|
)
|
|
SELECT $1, ma.id, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13
|
|
FROM mail_accounts ma
|
|
WHERE ma.id = $2 AND ma.user_id = $1
|
|
RETURNING id
|
|
`, userID, req.AccountID, toJSON, ccJSON, bccJSON, req.Subject, req.BodyText, req.BodyHTML,
|
|
inReplyTo, references, status, req.ScheduleAt, req.IdempotencyKey).Scan(&id)
|
|
if err != nil {
|
|
if errors.Is(err, pgx.ErrNoRows) {
|
|
return "", "", ErrAccountNotFound
|
|
}
|
|
if req.IdempotencyKey != "" && isUniqueViolation(err) {
|
|
err = s.db.QueryRow(ctx, `
|
|
SELECT id, status FROM outbox
|
|
WHERE user_id = $1 AND idempotency_key = $2
|
|
ORDER BY created_at DESC
|
|
LIMIT 1
|
|
`, userID, req.IdempotencyKey).Scan(&id, &status)
|
|
if err == nil {
|
|
return id, status, nil
|
|
}
|
|
}
|
|
return "", "", err
|
|
}
|
|
return id, status, nil
|
|
}
|
|
|
|
type RulesList struct {
|
|
Rules []map[string]any `json:"rules"`
|
|
Pagination query.PaginationMeta `json:"pagination,omitempty"`
|
|
}
|
|
|
|
func (s *Service) ListRules(ctx context.Context, externalID string, params query.ListParams) (RulesList, error) {
|
|
var total int64
|
|
err := s.db.QueryRow(ctx, `
|
|
SELECT COUNT(*) FROM mail_rules WHERE user_id = (SELECT id FROM users WHERE external_id = $1)
|
|
`, externalID).Scan(&total)
|
|
if err != nil {
|
|
return RulesList{}, err
|
|
}
|
|
|
|
rows, err := s.db.Query(ctx, `
|
|
SELECT id, name, priority, conditions, actions, is_active, match_count
|
|
FROM mail_rules WHERE user_id = (SELECT id FROM users WHERE external_id = $1)
|
|
ORDER BY priority ASC
|
|
LIMIT $2 OFFSET $3
|
|
`, externalID, params.Limit(), params.Offset())
|
|
if err != nil {
|
|
return RulesList{}, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
rules := make([]map[string]any, 0)
|
|
for rows.Next() {
|
|
var id, name string
|
|
var priority int
|
|
var conditions, actions []byte
|
|
var isActive bool
|
|
var matchCount int64
|
|
if err := rows.Scan(&id, &name, &priority, &conditions, &actions, &isActive, &matchCount); err != nil {
|
|
return RulesList{}, err
|
|
}
|
|
rules = append(rules, map[string]any{
|
|
"id": id, "name": name, "priority": priority,
|
|
"conditions": json.RawMessage(conditions), "actions": json.RawMessage(actions),
|
|
"is_active": isActive, "match_count": matchCount,
|
|
})
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return RulesList{}, err
|
|
}
|
|
|
|
return RulesList{
|
|
Rules: rules,
|
|
Pagination: params.Meta(&total),
|
|
}, nil
|
|
}
|
|
|
|
func (s *Service) CreateRule(ctx context.Context, userID string, req *createRuleRequest) (string, error) {
|
|
condJSON, _ := json.Marshal(req.Conditions)
|
|
actJSON, _ := json.Marshal(req.Actions)
|
|
|
|
if req.AccountID != "" {
|
|
var exists bool
|
|
err := s.db.QueryRow(ctx, `
|
|
SELECT EXISTS(SELECT 1 FROM mail_accounts WHERE id = $1 AND user_id = $2)
|
|
`, req.AccountID, userID).Scan(&exists)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
if !exists {
|
|
return "", ErrAccountNotFound
|
|
}
|
|
}
|
|
|
|
var id string
|
|
err := s.db.QueryRow(ctx, `
|
|
INSERT INTO mail_rules (user_id, account_id, name, priority, conditions, actions)
|
|
VALUES ($1, $2, $3, $4, $5, $6)
|
|
RETURNING id
|
|
`, userID, nilIfEmpty(req.AccountID), req.Name, req.Priority, condJSON, actJSON).Scan(&id)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
return id, nil
|
|
}
|
|
|
|
func (s *Service) UpdateRule(ctx context.Context, externalID, ruleID string, req *updateRuleRequest) error {
|
|
condJSON, _ := json.Marshal(req.Conditions)
|
|
actJSON, _ := json.Marshal(req.Actions)
|
|
|
|
result, err := s.db.Exec(ctx, `
|
|
UPDATE mail_rules SET name=$1, priority=$2, is_active=$3, conditions=$4, actions=$5, updated_at=NOW()
|
|
WHERE id=$6 AND user_id=(SELECT id FROM users WHERE external_id=$7)
|
|
`, req.Name, req.Priority, req.IsActive, condJSON, actJSON, ruleID, externalID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if result.RowsAffected() == 0 {
|
|
return ErrNotFound
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *Service) DeleteRule(ctx context.Context, externalID, ruleID string) error {
|
|
result, err := s.db.Exec(ctx, `
|
|
DELETE FROM mail_rules
|
|
WHERE id = $1 AND user_id = (SELECT id FROM users WHERE external_id = $2)
|
|
`, ruleID, externalID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if result.RowsAffected() == 0 {
|
|
return ErrNotFound
|
|
}
|
|
if s.audit != nil {
|
|
s.audit.Log(ctx, externalID, securityaudit.ActionCriticalDeletion, map[string]any{
|
|
"target": "mail_rule", "rule_id": ruleID,
|
|
})
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type WebhooksList struct {
|
|
Webhooks []map[string]any `json:"webhooks"`
|
|
Pagination query.PaginationMeta `json:"pagination,omitempty"`
|
|
}
|
|
|
|
func (s *Service) ListWebhooks(ctx context.Context, externalID string, params query.ListParams) (WebhooksList, error) {
|
|
var total int64
|
|
err := s.db.QueryRow(ctx, `
|
|
SELECT COUNT(*) FROM webhook_templates
|
|
WHERE user_id = (SELECT id FROM users WHERE external_id = $1)
|
|
`, externalID).Scan(&total)
|
|
if err != nil {
|
|
return WebhooksList{}, err
|
|
}
|
|
|
|
rows, err := s.db.Query(ctx, `
|
|
SELECT id, name, url, method, version, is_active FROM webhook_templates
|
|
WHERE user_id = (SELECT id FROM users WHERE external_id = $1)
|
|
ORDER BY created_at ASC
|
|
LIMIT $2 OFFSET $3
|
|
`, externalID, params.Limit(), params.Offset())
|
|
if err != nil {
|
|
return WebhooksList{}, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
webhooks := make([]map[string]any, 0)
|
|
for rows.Next() {
|
|
var id, name, url, method string
|
|
var version int
|
|
var isActive bool
|
|
if err := rows.Scan(&id, &name, &url, &method, &version, &isActive); err != nil {
|
|
return WebhooksList{}, err
|
|
}
|
|
webhooks = append(webhooks, map[string]any{
|
|
"id": id, "name": name, "url": url, "method": method, "version": version, "is_active": isActive,
|
|
})
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return WebhooksList{}, err
|
|
}
|
|
|
|
return WebhooksList{
|
|
Webhooks: webhooks,
|
|
Pagination: params.Meta(&total),
|
|
}, nil
|
|
}
|
|
|
|
func (s *Service) CreateWebhook(ctx context.Context, externalID string, req *createWebhookRequest, method string, maxRetries int) (string, error) {
|
|
headersJSON, _ := json.Marshal(req.Headers)
|
|
|
|
var id string
|
|
err := s.db.QueryRow(ctx, `
|
|
INSERT INTO webhook_templates (user_id, name, url, method, headers, body_template, version, signing_secret, max_retries)
|
|
VALUES ((SELECT id FROM users WHERE external_id = $1), $2, $3, $4, $5, $6, 1, $7, $8)
|
|
RETURNING id
|
|
`, externalID, req.Name, req.URL, method, headersJSON, req.BodyTemplate, req.SigningSecret, maxRetries).Scan(&id)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
if _, err := s.db.Exec(ctx, `
|
|
INSERT INTO webhook_template_versions (template_id, version, method, headers, body_template)
|
|
VALUES ($1, 1, $2, $3, $4)
|
|
`, id, method, headersJSON, req.BodyTemplate); err != nil {
|
|
return "", err
|
|
}
|
|
return id, nil
|
|
}
|
|
|
|
func (s *Service) DeleteWebhook(ctx context.Context, externalID, webhookID string) error {
|
|
result, err := s.db.Exec(ctx, `
|
|
DELETE FROM webhook_templates
|
|
WHERE id = $1 AND user_id = (SELECT id FROM users WHERE external_id = $2)
|
|
`, webhookID, externalID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if result.RowsAffected() == 0 {
|
|
return ErrNotFound
|
|
}
|
|
if s.audit != nil {
|
|
s.audit.Log(ctx, externalID, securityaudit.ActionCriticalDeletion, map[string]any{
|
|
"target": "webhook_template", "webhook_id": webhookID,
|
|
})
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func nilIfEmpty(s string) any {
|
|
if s == "" {
|
|
return nil
|
|
}
|
|
return s
|
|
}
|