Files
radiola-backend/src/enrich/enrichment.service.ts
nk dcc2f599f9 fix(enrich): непрерывный бэкафилл (пополнять очередь когда почти пуста)
Прежнее условие «очередь пуста» почти не срабатывало — now-playing-крон держал
очередь занятой, и холодный бэклог (~21k) не двигался. Теперь раз в минуту
подкидываем 100 pending когда очередь почти пуста; играющие треки идут вперёд
по приоритету.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-03 15:05:51 +03:00

226 lines
8.7 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import { Injectable, Logger } from '@nestjs/common';
import { Cron, CronExpression } from '@nestjs/schedule';
import { PrismaService } from '../prisma/prisma.service';
import { DiscogsService } from './discogs.service';
import { CoverStorageService } from './cover-storage.service';
/**
* Оркестратор обогащения трека: при первом появлении трека подтягиваем
* жанр/стиль/лейбл/год из Discogs и сохраняем обложку в едином формате (WebP)
* у себя. Дальше пользователю отдаём только из своей БД — внешние сервисы
* в рантайме не дёргаем.
*/
@Injectable()
export class EnrichmentService {
private readonly logger = new Logger(EnrichmentService.name);
// Очередь обогащения с троттлингом (под лимиты Discogs/iTunes)
private readonly queue: string[] = [];
private running = false;
private readonly throttleMs = 1000;
private readonly concurrency = 3;
constructor(
private readonly prisma: PrismaService,
private readonly discogs: DiscogsService,
private readonly covers: CoverStorageService,
) {}
// Поставить трек в очередь. priority — играющие сейчас треки (в начало очереди),
// чтобы обложка успела появиться, пока трек звучит.
enqueue(trackId: string, opts?: { priority?: boolean }): void {
const idx = this.queue.indexOf(trackId);
if (idx !== -1) {
if (opts?.priority && idx > 0) {
this.queue.splice(idx, 1);
this.queue.unshift(trackId);
}
return;
}
if (opts?.priority) this.queue.unshift(trackId);
else this.queue.push(trackId);
void this.drain();
}
// Непрерывно добираем холодный бэклог: когда очередь почти пуста — подкидываем
// батч pending (не-приоритетно, играющие треки всё равно идут вперёд).
// Раз в минуту, чтобы конвейер не простаивал между всплесками now-playing.
@Cron(CronExpression.EVERY_MINUTE)
async backfill(): Promise<void> {
if (!this.discogs.enabled) return; // без токена смысла нет — не крутим вхолостую
if (this.queue.length > this.concurrency) return; // ещё есть что жевать
const pending = await this.prisma.track.findMany({
where: { enrichStatus: 'pending' },
select: { id: true },
orderBy: { firstSeenAt: 'desc' },
take: 100,
});
for (const t of pending) this.enqueue(t.id);
}
// Раз в минуту гарантируем обложку у играющих СЕЙЧАС треков: создаём Track
// при отсутствии (без записи проигрывания) и приоритетно обогащаем тех, у кого
// ещё нет обложки. Так now-playing-обложки появляются быстро у всех сетей.
@Cron(CronExpression.EVERY_MINUTE)
async enrichNowPlaying(): Promise<void> {
const rows = await this.prisma.nowPlaying.findMany({
select: { artist: true, song: true },
});
for (const r of rows) {
const artist = (r.artist ?? '').trim();
const song = (r.song ?? '').trim();
if (!artist || !song) continue;
const normKey = this.buildNormKey(artist, song);
const track = await this.prisma.track.upsert({
where: { normKey },
create: { normKey, artist, song },
update: {},
select: { id: true, coverUrl: true },
});
if (!track.coverUrl) this.enqueue(track.id, { priority: true });
}
}
// Нормализованный ключ — как в ChartsService.recordPlay
private buildNormKey(artist: string, song: string): string {
return (
artist.toLowerCase().replace(/\s+/g, ' ') +
'|' +
song.toLowerCase().replace(/\s+/g, ' ')
);
}
private async drain(): Promise<void> {
if (this.running) return;
this.running = true;
try {
while (this.queue.length > 0) {
const batch = this.queue.splice(0, this.concurrency);
await Promise.all(batch.map((id) => this.enrichOne(id)));
await this.sleep(this.throttleMs);
}
} finally {
this.running = false;
}
}
private async enrichOne(trackId: string): Promise<void> {
try {
const track = await this.prisma.track.findUnique({
where: { id: trackId },
});
if (!track || track.enrichStatus === 'done') return;
const data = this.discogs.enabled
? await this.discogs.lookup(track.artist, track.song)
: null;
// iTunes: обложка (покрытие почти как у Record) + альбом/год/жанр как
// фолбэк к Discogs. Гибрид: стили и лейбл — только Discogs.
const itunes = await this.fetchItunes(track.artist, track.song);
// Обложка → WebP к себе (если ещё не наша)
let coverUrl = track.coverUrl;
const candidate = itunes?.coverUrl ?? data?.coverImageUrl ?? track.coverUrl;
if (candidate && !this.isSelfHosted(candidate)) {
const stored = await this.covers.store(candidate, track.normKey);
if (stored) coverUrl = stored;
}
// Жанр: Discogs приоритетнее (тонкий), затем iTunes (грубый фолбэк)
const genre = data?.genre ?? itunes?.genre ?? track.genre;
const year = data?.year ?? itunes?.year ?? track.year;
const album = track.album ?? itunes?.album ?? null;
const releaseDate =
track.releaseDate ??
itunes?.releaseDate ??
(data?.year ? new Date(Date.UTC(data.year, 0, 1)) : null);
// Без токена Discogs стили/лейбл не получим — оставляем pending, чтобы
// добрать позже (но обложку/жанр-iTunes уже сохранили).
const enriched = this.discogs.enabled;
await this.prisma.track.update({
where: { id: trackId },
data: {
genre,
styles: data?.styles?.length ? data.styles : track.styles,
label: data?.label ?? track.label,
year,
album,
discogsId: data?.discogsId ?? track.discogsId,
coverUrl,
releaseDate,
enrichStatus: enriched ? 'done' : 'pending',
enrichedAt: enriched ? new Date() : track.enrichedAt,
},
});
this.logger.debug(
`Обогащён "${track.artist}${track.song}": genre=${genre ?? '—'}, label=${data?.label ?? '—'}`,
);
} catch (e) {
this.logger.debug(`Обогащение ${trackId} не удалось: ${(e as Error).message}`);
await this.prisma.track
.update({ where: { id: trackId }, data: { enrichStatus: 'failed' } })
.catch(() => undefined);
}
}
// iTunes Search API (без ключа, высокое покрытие): обложка (600×600) +
// альбом/год/жанр/дата релиза.
private async fetchItunes(
artist: string,
song: string,
): Promise<{
coverUrl: string | null;
album: string | null;
year: number | null;
releaseDate: Date | null;
genre: string | null;
} | null> {
try {
const term = encodeURIComponent(`${artist} ${song}`.trim());
const url = `https://itunes.apple.com/search?term=${term}&entity=song&limit=1`;
const res = await fetch(url, {
headers: { 'User-Agent': 'radiOLA/1.0 +https://radiola.app' },
});
if (!res.ok) return null;
const data = (await res.json()) as {
results?: Array<{
artworkUrl100?: string;
collectionName?: string;
releaseDate?: string;
primaryGenreName?: string;
}>;
};
const r = data.results?.[0];
if (!r) return null;
const cover = r.artworkUrl100
? r.artworkUrl100.replace(/\/\d+x\d+bb\./, '/600x600bb.')
: null;
const rd = r.releaseDate ? new Date(r.releaseDate) : null;
const validDate = rd && !isNaN(rd.getTime()) ? rd : null;
return {
coverUrl: cover,
album: r.collectionName ?? null,
year: validDate ? validDate.getUTCFullYear() : null,
releaseDate: validDate,
genre: r.primaryGenreName ?? null,
};
} catch {
return null;
}
}
private isSelfHosted(url: string): boolean {
return url.includes('/covers/');
}
private sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
}