177 lines
5.7 KiB
TypeScript
177 lines
5.7 KiB
TypeScript
/**
|
||
* Email-Queue: Speichert fehlgeschlagene E-Mails in der Datenbank
|
||
* und versucht sie regelmäßig erneut zu senden.
|
||
*
|
||
* WICHTIG: Kein process.env.NEXT_PUBLIC_* verwenden – diese Werte
|
||
* werden beim Build eingebettet und enthalten die öffentliche URL
|
||
* (supabase.demo.mbo-tech-it.de), die vom Docker-Container aus nicht
|
||
* erreichbar ist. Stattdessen createServiceClient() nutzen, der intern
|
||
* SUPABASE_INTERNAL_URL verwendet (Runtime-Variable, nie eingebettet).
|
||
*
|
||
* Retry-Strategie: exponentielles Backoff
|
||
* Versuch 1 → sofort
|
||
* Versuch 2 → 1 Min
|
||
* Versuch 3 → 2 Min
|
||
* Versuch 4 → 4 Min
|
||
* ...bis max. 60 Min zwischen Versuchen, dann Status "failed"
|
||
*/
|
||
|
||
import nodemailer from "nodemailer";
|
||
import { createServiceClient } from "./supabase";
|
||
|
||
export interface QueuedMail {
|
||
mail_from: string;
|
||
mail_to: string;
|
||
reply_to?: string;
|
||
subject: string;
|
||
html: string;
|
||
body_text: string;
|
||
}
|
||
|
||
// ─── Datenbank-Operationen via Supabase-Client (interne URL) ──────────────
|
||
|
||
async function dbInsert(mail: QueuedMail) {
|
||
const db = createServiceClient();
|
||
const { error } = await db.from("email_queue").insert({
|
||
...mail,
|
||
status: "pending",
|
||
retry_count: 0,
|
||
next_retry_at: new Date().toISOString(),
|
||
});
|
||
if (error) throw new Error(`Queue-Insert fehlgeschlagen: ${error.message}`);
|
||
}
|
||
|
||
async function dbFetchPending(): Promise<
|
||
Array<QueuedMail & { id: string; retry_count: number }>
|
||
> {
|
||
const db = createServiceClient();
|
||
const now = new Date().toISOString();
|
||
const { data, error } = await db
|
||
.from("email_queue")
|
||
.select("*")
|
||
.eq("status", "pending")
|
||
.lte("next_retry_at", now)
|
||
.order("next_retry_at", { ascending: true })
|
||
.limit(20);
|
||
|
||
if (error) {
|
||
console.error("[EmailQueue] Fetch pending fehlgeschlagen:", error.message);
|
||
return [];
|
||
}
|
||
return (data ?? []) as Array<QueuedMail & { id: string; retry_count: number }>;
|
||
}
|
||
|
||
async function dbMarkSent(id: string) {
|
||
const db = createServiceClient();
|
||
await db
|
||
.from("email_queue")
|
||
.update({ status: "sent", error_last: null })
|
||
.eq("id", id);
|
||
}
|
||
|
||
async function dbMarkRetry(id: string, retryCount: number, error: string) {
|
||
const db = createServiceClient();
|
||
const nextCount = retryCount + 1;
|
||
const maxRetries = 10;
|
||
|
||
if (nextCount >= maxRetries) {
|
||
await db
|
||
.from("email_queue")
|
||
.update({ status: "failed", retry_count: nextCount, error_last: error.slice(0, 500) })
|
||
.eq("id", id);
|
||
console.error(
|
||
`[EmailQueue] Mail ${id} endgültig fehlgeschlagen nach ${nextCount} Versuchen: ${error}`
|
||
);
|
||
return;
|
||
}
|
||
|
||
// Exponentielles Backoff: 1, 2, 4, 8, 16, 32, 60, 60, 60... Minuten
|
||
const minutesDelay = Math.min(Math.pow(2, nextCount - 1), 60);
|
||
const nextRetry = new Date(Date.now() + minutesDelay * 60 * 1000).toISOString();
|
||
|
||
await db
|
||
.from("email_queue")
|
||
.update({ retry_count: nextCount, next_retry_at: nextRetry, error_last: error.slice(0, 500) })
|
||
.eq("id", id);
|
||
|
||
console.warn(
|
||
`[EmailQueue] Versuch ${nextCount}/${maxRetries} fehlgeschlagen – nächster Retry in ${minutesDelay} Min. Fehler: ${error}`
|
||
);
|
||
}
|
||
|
||
// ─── Queue-Eintrag speichern ───────────────────────────────────────────────
|
||
|
||
export async function queueEmail(mail: QueuedMail): Promise<void> {
|
||
try {
|
||
await dbInsert(mail);
|
||
console.log(`[EmailQueue] "${mail.subject}" → "${mail.mail_to}" in Queue gespeichert`);
|
||
// Sofort-Retry: nicht auf das 60s-Intervall warten
|
||
processQueue().catch(() => {});
|
||
} catch (err) {
|
||
console.error("[EmailQueue] Konnte Mail NICHT in Queue speichern:", err);
|
||
}
|
||
}
|
||
|
||
// ─── Worker: Pending Mails senden ─────────────────────────────────────────
|
||
|
||
let transporter: nodemailer.Transporter | null = null;
|
||
|
||
function getTransporter() {
|
||
if (!transporter) {
|
||
transporter = nodemailer.createTransport({
|
||
host: process.env.SMTP_HOST,
|
||
port: Number(process.env.SMTP_PORT ?? 587),
|
||
secure: false,
|
||
auth: { user: process.env.SMTP_USER, pass: process.env.SMTP_PASS },
|
||
connectionTimeout: 15000,
|
||
greetingTimeout: 10000,
|
||
socketTimeout: 20000,
|
||
tls: { rejectUnauthorized: false, ciphers: "SSLv3" },
|
||
});
|
||
}
|
||
return transporter;
|
||
}
|
||
|
||
async function processQueue(): Promise<void> {
|
||
const pending = await dbFetchPending();
|
||
if (pending.length === 0) return;
|
||
|
||
console.log(`[EmailQueue] Verarbeite ${pending.length} ausstehende Mail(s)...`);
|
||
|
||
for (const mail of pending) {
|
||
try {
|
||
await getTransporter().sendMail({
|
||
from: mail.mail_from,
|
||
to: mail.mail_to,
|
||
replyTo: mail.reply_to,
|
||
subject: mail.subject,
|
||
html: mail.html,
|
||
text: mail.body_text,
|
||
});
|
||
await dbMarkSent(mail.id);
|
||
console.log(`[EmailQueue] ✓ "${mail.subject}" → "${mail.mail_to}" gesendet`);
|
||
} catch (err) {
|
||
const msg = err instanceof Error ? err.message : String(err);
|
||
await dbMarkRetry(mail.id, mail.retry_count, msg);
|
||
}
|
||
}
|
||
}
|
||
|
||
// ─── Worker starten (Singleton) ───────────────────────────────────────────
|
||
|
||
let workerStarted = false;
|
||
|
||
export function startEmailQueueWorker(): void {
|
||
if (workerStarted) return;
|
||
workerStarted = true;
|
||
|
||
console.log("[EmailQueue] Worker gestartet – prüft alle 60 Sekunden");
|
||
|
||
// Sofort beim Start: Mails aus vorherigen Abstürzen verarbeiten
|
||
processQueue().catch((e) => console.error("[EmailQueue] Initialer Lauf fehlgeschlagen:", e));
|
||
|
||
setInterval(() => {
|
||
processQueue().catch((e) => console.error("[EmailQueue] Lauf fehlgeschlagen:", e));
|
||
}, 60_000);
|
||
}
|