- track_plays(played_at,track_id,station_id) покрывающий + tracks(first_seen_at) WHERE pending частичный (применены CONCURRENTLY на проде + миграция idempotent) - ChartsService.getTopTracks: in-memory TTL-кэш 90с по (period,genre,limit) → детальная страница и параллельные запросы не пересчитывают тяжёлые агрегации - NowPlayingService.ingest: не пишем now_playing и не шлём сокет, если трек не изменился (было ~20k бесполезных upsert/час) - enrichNowPlaying: вместо N+1 (300 upsert/мин) — один batched findMany по normKey Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
476 lines
19 KiB
TypeScript
476 lines
19 KiB
TypeScript
import { Injectable, Logger } from '@nestjs/common';
|
||
import { Cron, CronExpression } from '@nestjs/schedule';
|
||
import { ProxyAgent } from 'undici';
|
||
import { PrismaService } from '../prisma/prisma.service';
|
||
import { DiscogsService } from './discogs.service';
|
||
import { CoverStorageService } from './cover-storage.service';
|
||
|
||
/**
|
||
* Оркестратор обогащения трека: при первом появлении трека подтягиваем
|
||
* жанр/стиль/лейбл/год из Discogs и сохраняем обложку в едином формате (WebP)
|
||
* у себя. Дальше пользователю отдаём только из своей БД — внешние сервисы
|
||
* в рантайме не дёргаем.
|
||
*/
|
||
@Injectable()
|
||
export class EnrichmentService {
|
||
private readonly logger = new Logger(EnrichmentService.name);
|
||
|
||
// Очередь обогащения с троттлингом (под лимиты Discogs/iTunes)
|
||
private readonly queue: string[] = [];
|
||
private running = false;
|
||
// Discogs сам себя лимитирует (rate-limiter в DiscogsService), поэтому можно
|
||
// выше параллельность: обложки (iTunes, без лимита) льются быстрее.
|
||
private readonly throttleMs = 150;
|
||
private readonly concurrency = 12;
|
||
|
||
// RU-IP сервера забанен Apple (429) и Deezer из РФ отдаёт пустой каталог —
|
||
// поэтому iTunes/Deezer ходят через тот же DE-прокси, что и Discogs.
|
||
private readonly proxyDispatcher = process.env.DISCOGS_PROXY
|
||
? new ProxyAgent(process.env.DISCOGS_PROXY)
|
||
: undefined;
|
||
|
||
// iTunes лимитирует ПО IP (~20/мин) и легко банит общий DE-IP (его делит
|
||
// Discogs) — сериализуем запросы к iTunes с интервалом.
|
||
private itunesGate: Promise<void> = Promise.resolve();
|
||
private readonly itunesMinIntervalMs = 3500;
|
||
|
||
constructor(
|
||
private readonly prisma: PrismaService,
|
||
private readonly discogs: DiscogsService,
|
||
private readonly covers: CoverStorageService,
|
||
) {}
|
||
|
||
// Поставить трек в очередь. priority — играющие сейчас треки (в начало очереди),
|
||
// чтобы обложка успела появиться, пока трек звучит.
|
||
enqueue(trackId: string, opts?: { priority?: boolean }): void {
|
||
const idx = this.queue.indexOf(trackId);
|
||
if (idx !== -1) {
|
||
if (opts?.priority && idx > 0) {
|
||
this.queue.splice(idx, 1);
|
||
this.queue.unshift(trackId);
|
||
}
|
||
return;
|
||
}
|
||
if (opts?.priority) this.queue.unshift(trackId);
|
||
else this.queue.push(trackId);
|
||
void this.drain();
|
||
}
|
||
|
||
// Непрерывно добираем холодный бэклог: когда очередь почти пуста — подкидываем
|
||
// батч pending (не-приоритетно, играющие треки всё равно идут вперёд).
|
||
// Раз в минуту, чтобы конвейер не простаивал между всплесками now-playing.
|
||
@Cron(CronExpression.EVERY_MINUTE)
|
||
async backfill(): Promise<void> {
|
||
if (!this.discogs.enabled) return; // без токена смысла нет — не крутим вхолостую
|
||
if (this.queue.length > this.concurrency) return; // ещё есть что жевать
|
||
const pending = await this.prisma.track.findMany({
|
||
where: { enrichStatus: 'pending' },
|
||
select: { id: true },
|
||
orderBy: { firstSeenAt: 'desc' },
|
||
take: 100,
|
||
});
|
||
for (const t of pending) this.enqueue(t.id);
|
||
}
|
||
|
||
// Раз в минуту обеспечиваем ОБЛОЖКУ у играющих СЕЙЧАС треков — быстрый проход
|
||
// ТОЛЬКО через iTunes (без Discogs, который лимитирован 54/мин и тормозил бы
|
||
// обложки). Полное обогащение (жанр/стили) идёт фоном через backfill/enqueue.
|
||
private nowPlayingRunning = false;
|
||
|
||
@Cron(CronExpression.EVERY_MINUTE)
|
||
async enrichNowPlaying(): Promise<void> {
|
||
if (this.nowPlayingRunning) return; // не накладываем проходы
|
||
this.nowPlayingRunning = true;
|
||
try {
|
||
await this.runEnrichNowPlaying();
|
||
} finally {
|
||
this.nowPlayingRunning = false;
|
||
}
|
||
}
|
||
|
||
private async runEnrichNowPlaying(): Promise<void> {
|
||
const rows = await this.prisma.nowPlaying.findMany({
|
||
select: { artist: true, song: true },
|
||
});
|
||
// Треки уже созданы в ChartsService.recordPlay — не upsert'им построчно (был
|
||
// N+1 на ~300 строк/мин), а читаем пачкой по normKey. Мусор/исключённые
|
||
// станции трек не создавали → их и не обогащаем (это правильно).
|
||
const normKeys = new Set<string>();
|
||
for (const r of rows) {
|
||
const artist = (r.artist ?? '').trim();
|
||
const song = (r.song ?? '').trim();
|
||
if (artist && song) normKeys.add(this.buildNormKey(artist, song));
|
||
}
|
||
if (normKeys.size === 0) return;
|
||
|
||
const tracks = await this.prisma.track.findMany({
|
||
where: { normKey: { in: [...normKeys] } },
|
||
select: {
|
||
id: true,
|
||
artist: true,
|
||
song: true,
|
||
normKey: true,
|
||
coverUrl: true,
|
||
enrichStatus: true,
|
||
},
|
||
});
|
||
|
||
const todo: { id: string; artist: string; song: string; normKey: string }[] = [];
|
||
for (const track of tracks) {
|
||
if (!track.coverUrl) {
|
||
todo.push({
|
||
id: track.id,
|
||
artist: track.artist,
|
||
song: track.song,
|
||
normKey: track.normKey,
|
||
});
|
||
}
|
||
// полное обогащение (жанр) — в общую очередь, если ещё не сделано
|
||
if (track.enrichStatus !== 'done') this.enqueue(track.id);
|
||
}
|
||
// Быстрый cover-only проход, по 8 параллельно — чтобы успевать за сменой
|
||
// треков по всем сетям (~120/мин)
|
||
for (let i = 0; i < todo.length; i += 8) {
|
||
await Promise.all(todo.slice(i, i + 8).map((t) => this.coverFast(t)));
|
||
}
|
||
}
|
||
|
||
// Только обложка — для быстрого покрытия эфира. Чтобы не множить нагрузку на
|
||
// iTunes (лимит ~20/мин на 170+ играющих треков), делаем ОДИН запрос iTunes по
|
||
// очищенному названию (он чаще матчит ремиксы/«(Original Mix)»), а на промахе/
|
||
// лимите идём в Deezer (отдельный лимит, хорошее покрытие электроники).
|
||
private async coverFast(t: {
|
||
id: string;
|
||
artist: string;
|
||
song: string;
|
||
normKey: string;
|
||
}): Promise<void> {
|
||
try {
|
||
const cover = await this.fetchCover(t.artist, t.song);
|
||
if (!cover?.coverUrl) return;
|
||
const stored = await this.covers.store(cover.coverUrl, t.normKey);
|
||
if (!stored) return;
|
||
await this.prisma.track.update({
|
||
where: { id: t.id },
|
||
data: {
|
||
coverUrl: stored,
|
||
genre: cover.genre ?? undefined,
|
||
album: cover.album ?? undefined,
|
||
},
|
||
});
|
||
} catch {
|
||
// сбой — добёрём на следующем тике
|
||
}
|
||
}
|
||
|
||
/** Только обложка для быстрого now-playing-прохода — ТОЛЬКО Deezer (через
|
||
* DE-прокси, высокий лимит, параллельно). iTunes здесь НЕ дёргаем: его жёсткий
|
||
* троттлинг (3.5с) затыкал бы проход. Промахи Deezer добирает фоновый
|
||
* enrichOne (там iTunes через прокси с троттлингом). */
|
||
private async fetchCover(
|
||
artist: string,
|
||
song: string,
|
||
): Promise<{ coverUrl: string | null; genre: string | null; album: string | null } | null> {
|
||
const dz = await this.fetchDeezerCover(artist, song);
|
||
if (dz) return { coverUrl: dz, genre: null, album: null };
|
||
return null;
|
||
}
|
||
|
||
// ===== Клиентский сабмит обложки =====
|
||
// Клиент (со своего IP) делает iTunes-поиск (наш серверный IP забанен Apple)
|
||
// и присылает ССЫЛКУ на арт. Сервер качает её (CDN из РФ доступен) и кладёт
|
||
// WebP к себе. SSRF-защита: только доверенные CDN. Идемпотентно (first-write-wins).
|
||
private static readonly COVER_HOST_ALLOW = ['mzstatic.com', 'dzcdn.net'];
|
||
private submitInFlight = 0;
|
||
private readonly submitMaxInFlight = 6;
|
||
|
||
async submitCover(
|
||
artist: string,
|
||
song: string,
|
||
artworkUrl: string,
|
||
): Promise<{ coverUrl: string | null }> {
|
||
const a = (artist ?? '').trim();
|
||
const s = (song ?? '').trim();
|
||
if (!a || !s || !artworkUrl) return { coverUrl: null };
|
||
|
||
let host = '';
|
||
try {
|
||
host = new URL(artworkUrl).hostname.toLowerCase();
|
||
} catch {
|
||
return { coverUrl: null };
|
||
}
|
||
const allowed = EnrichmentService.COVER_HOST_ALLOW.some(
|
||
(h) => host === h || host.endsWith('.' + h),
|
||
);
|
||
if (!allowed) return { coverUrl: null };
|
||
|
||
const normKey = this.buildNormKey(a, s);
|
||
// Уже есть — отдаём существующую (не качаем повторно, защита от перезаписи).
|
||
const existing = await this.prisma.track.findUnique({
|
||
where: { normKey },
|
||
select: { coverUrl: true },
|
||
});
|
||
if (existing?.coverUrl) return { coverUrl: existing.coverUrl };
|
||
|
||
if (this.submitInFlight >= this.submitMaxInFlight) return { coverUrl: null };
|
||
this.submitInFlight++;
|
||
try {
|
||
const stored = await this.covers.store(artworkUrl, normKey);
|
||
if (!stored) return { coverUrl: null };
|
||
await this.prisma.track.upsert({
|
||
where: { normKey },
|
||
create: { normKey, artist: a, song: s, coverUrl: stored },
|
||
update: { coverUrl: stored },
|
||
});
|
||
return { coverUrl: stored };
|
||
} finally {
|
||
this.submitInFlight--;
|
||
}
|
||
}
|
||
|
||
// Нормализованный ключ — как в ChartsService.recordPlay
|
||
private buildNormKey(artist: string, song: string): string {
|
||
return (
|
||
artist.toLowerCase().replace(/\s+/g, ' ') +
|
||
'|' +
|
||
song.toLowerCase().replace(/\s+/g, ' ')
|
||
);
|
||
}
|
||
|
||
private async drain(): Promise<void> {
|
||
if (this.running) return;
|
||
this.running = true;
|
||
try {
|
||
while (this.queue.length > 0) {
|
||
const batch = this.queue.splice(0, this.concurrency);
|
||
await Promise.all(batch.map((id) => this.enrichOne(id)));
|
||
await this.sleep(this.throttleMs);
|
||
}
|
||
} finally {
|
||
this.running = false;
|
||
}
|
||
}
|
||
|
||
private async enrichOne(trackId: string): Promise<void> {
|
||
try {
|
||
const track = await this.prisma.track.findUnique({
|
||
where: { id: trackId },
|
||
});
|
||
if (!track || track.enrichStatus === 'done') return;
|
||
|
||
const data = this.discogs.enabled
|
||
? await this.discogs.lookup(track.artist, track.song)
|
||
: null;
|
||
|
||
// iTunes: обложка (покрытие почти как у Record) + альбом/год/жанр как
|
||
// фолбэк к Discogs. Гибрид: стили и лейбл — только Discogs.
|
||
// Отличаем сбой запроса (ретраить) от чистого «не найдено» (done).
|
||
let itunes: Awaited<ReturnType<typeof this.fetchItunes>> = null;
|
||
let itunesFailed = false;
|
||
try {
|
||
itunes = await this.fetchItunes(track.artist, track.song);
|
||
} catch {
|
||
itunesFailed = true;
|
||
}
|
||
|
||
// Обложка → WebP к себе (если ещё не наша)
|
||
let coverUrl = track.coverUrl;
|
||
const candidate = itunes?.coverUrl ?? data?.coverImageUrl ?? track.coverUrl;
|
||
if (candidate && !this.isSelfHosted(candidate)) {
|
||
const stored = await this.covers.store(candidate, track.normKey);
|
||
if (stored) coverUrl = stored;
|
||
}
|
||
|
||
// Жанр: Discogs приоритетнее (тонкий), затем iTunes (грубый фолбэк)
|
||
const genre = data?.genre ?? itunes?.genre ?? track.genre;
|
||
const year = data?.year ?? itunes?.year ?? track.year;
|
||
const album = track.album ?? itunes?.album ?? null;
|
||
const releaseDate =
|
||
track.releaseDate ??
|
||
itunes?.releaseDate ??
|
||
(data?.year ? new Date(Date.UTC(data.year, 0, 1)) : null);
|
||
|
||
// Помечаем done, если обогатились. НЕ помечаем (оставляем pending для
|
||
// ретрая), если: нет токена Discogs, ИЛИ запрос к iTunes упал И обложку
|
||
// так и не получили (транзиентный сбой — промах не должен застывать).
|
||
const enriched = this.discogs.enabled && !(itunesFailed && !coverUrl);
|
||
|
||
await this.prisma.track.update({
|
||
where: { id: trackId },
|
||
data: {
|
||
genre,
|
||
styles: data?.styles?.length ? data.styles : track.styles,
|
||
label: data?.label ?? track.label,
|
||
year,
|
||
album,
|
||
discogsId: data?.discogsId ?? track.discogsId,
|
||
coverUrl,
|
||
releaseDate,
|
||
enrichStatus: enriched ? 'done' : 'pending',
|
||
enrichedAt: enriched ? new Date() : track.enrichedAt,
|
||
},
|
||
});
|
||
|
||
this.logger.debug(
|
||
`Обогащён "${track.artist} — ${track.song}": genre=${genre ?? '—'}, label=${data?.label ?? '—'}`,
|
||
);
|
||
} catch (e) {
|
||
this.logger.debug(`Обогащение ${trackId} не удалось: ${(e as Error).message}`);
|
||
await this.prisma.track
|
||
.update({ where: { id: trackId }, data: { enrichStatus: 'failed' } })
|
||
.catch(() => undefined);
|
||
}
|
||
}
|
||
|
||
// iTunes Search API (без ключа, высокое покрытие): обложка (600×600) +
|
||
// альбом/год/жанр/дата релиза.
|
||
private async fetchItunes(
|
||
artist: string,
|
||
song: string,
|
||
): Promise<{
|
||
coverUrl: string | null;
|
||
album: string | null;
|
||
year: number | null;
|
||
releaseDate: Date | null;
|
||
genre: string | null;
|
||
} | null> {
|
||
// Попытка 1: как есть. Многие треки несут суффиксы «(Original Mix)»,
|
||
// «(SEA)», «[... Dub]», «feat. X» — они ломают точный матч iTunes (limit=1).
|
||
let r = await this.itunesSearch(`${artist} ${song}`);
|
||
|
||
// Попытка 2: очищенный запрос (без скобок/квадратных/feat) — даёт обложку
|
||
// базового трека, когда точный ремикс не нашёлся.
|
||
if (!r?.coverUrl) {
|
||
const cleaned = `${this.stripNoise(artist)} ${this.stripNoise(song)}`
|
||
.replace(/\s+/g, ' ')
|
||
.trim();
|
||
const original = `${artist} ${song}`.toLowerCase();
|
||
if (cleaned && cleaned.toLowerCase() !== original) {
|
||
const r2 = await this.itunesSearch(cleaned);
|
||
if (r2?.coverUrl) r = r2;
|
||
}
|
||
}
|
||
|
||
// Попытка 3: Deezer (публичный API, без ключа) — у него хорошее покрытие
|
||
// электроники/ремиксов/лаунжа, которых нет в iTunes. Берём только обложку.
|
||
if (!r?.coverUrl) {
|
||
const dz = await this.fetchDeezerCover(artist, song);
|
||
if (dz) {
|
||
r = {
|
||
coverUrl: dz,
|
||
album: r?.album ?? null,
|
||
year: r?.year ?? null,
|
||
releaseDate: r?.releaseDate ?? null,
|
||
genre: r?.genre ?? null,
|
||
};
|
||
}
|
||
}
|
||
|
||
return r;
|
||
}
|
||
|
||
/** Убирает «шумовые» суффиксы названия, мешающие матчу обложки. */
|
||
private stripNoise(s: string): string {
|
||
return s
|
||
.replace(/\([^)]*\)/g, ' ') // (Original Mix), (SEA), (feat. X)
|
||
.replace(/\[[^\]]*\]/g, ' ') // [Luxar Brooklyn Dub]
|
||
.replace(/\b(?:feat|ft|featuring)\.?\s+.*$/gi, ' ') // feat. X …
|
||
.replace(/[^\p{L}\p{N}]+/gu, ' ')
|
||
.replace(/\s+/g, ' ')
|
||
.trim();
|
||
}
|
||
|
||
/** Сериализует запросы к iTunes с минимальным интервалом (защита от 429/бана). */
|
||
private async itunesThrottle(): Promise<void> {
|
||
const prev = this.itunesGate;
|
||
let release!: () => void;
|
||
this.itunesGate = new Promise<void>((r) => (release = r));
|
||
await prev;
|
||
setTimeout(release, this.itunesMinIntervalMs);
|
||
}
|
||
|
||
/** Один поиск в iTunes по уже собранному запросу. Бросает при сбое сети/HTTP
|
||
* (отличаем сбой от чистого «не найдено» → null). */
|
||
private async itunesSearch(rawTerm: string): Promise<{
|
||
coverUrl: string | null;
|
||
album: string | null;
|
||
year: number | null;
|
||
releaseDate: Date | null;
|
||
genre: string | null;
|
||
} | null> {
|
||
const clean = rawTerm
|
||
.replace(/[^\p{L}\p{N}]+/gu, ' ')
|
||
.replace(/\s+/g, ' ')
|
||
.trim();
|
||
if (!clean) return null;
|
||
const term = encodeURIComponent(clean);
|
||
const url = `https://itunes.apple.com/search?term=${term}&entity=song&limit=1`;
|
||
await this.itunesThrottle();
|
||
const init: RequestInit & { dispatcher?: unknown } = {
|
||
headers: { 'User-Agent': 'radiOLA/1.0 +https://radiola.app' },
|
||
};
|
||
if (this.proxyDispatcher) init.dispatcher = this.proxyDispatcher;
|
||
const res = await fetch(url, init);
|
||
if (!res.ok) throw new Error(`iTunes ${res.status}`);
|
||
const data = (await res.json()) as {
|
||
results?: Array<{
|
||
artworkUrl100?: string;
|
||
collectionName?: string;
|
||
releaseDate?: string;
|
||
primaryGenreName?: string;
|
||
}>;
|
||
};
|
||
const r = data.results?.[0];
|
||
if (!r) return null;
|
||
|
||
const cover = r.artworkUrl100
|
||
? r.artworkUrl100.replace(/\/\d+x\d+bb\./, '/600x600bb.')
|
||
: null;
|
||
const rd = r.releaseDate ? new Date(r.releaseDate) : null;
|
||
const validDate = rd && !isNaN(rd.getTime()) ? rd : null;
|
||
|
||
return {
|
||
coverUrl: cover,
|
||
album: r.collectionName ?? null,
|
||
year: validDate ? validDate.getUTCFullYear() : null,
|
||
releaseDate: validDate,
|
||
genre: r.primaryGenreName ?? null,
|
||
};
|
||
}
|
||
|
||
/** Обложка из Deezer (фолбэк). Best-effort: при любой ошибке → null. */
|
||
private async fetchDeezerCover(
|
||
artist: string,
|
||
song: string,
|
||
): Promise<string | null> {
|
||
try {
|
||
const q = `${this.stripNoise(artist)} ${this.stripNoise(song)}`
|
||
.replace(/\s+/g, ' ')
|
||
.trim();
|
||
if (!q) return null;
|
||
const url = `https://api.deezer.com/search?limit=1&q=${encodeURIComponent(q)}`;
|
||
const init: RequestInit & { dispatcher?: unknown } = {
|
||
headers: { 'User-Agent': 'radiOLA/1.0 +https://radiola.app' },
|
||
};
|
||
if (this.proxyDispatcher) init.dispatcher = this.proxyDispatcher;
|
||
const res = await fetch(url, init);
|
||
if (!res.ok) return null;
|
||
const data = (await res.json()) as {
|
||
data?: Array<{ album?: { cover_xl?: string; cover_big?: string } }>;
|
||
};
|
||
const al = data.data?.[0]?.album;
|
||
return al?.cover_xl ?? al?.cover_big ?? null;
|
||
} catch {
|
||
return null;
|
||
}
|
||
}
|
||
|
||
private isSelfHosted(url: string): boolean {
|
||
return url.includes('/covers/');
|
||
}
|
||
|
||
private sleep(ms: number): Promise<void> {
|
||
return new Promise((resolve) => setTimeout(resolve, ms));
|
||
}
|
||
}
|