feat(now-playing): DFM и др. ICY-станции — обложки + чарты + ротация

ICY-станции (DFM и пр.) теперь полноценно «как Record»:
- ICY-поллер вызывает recordPlay → треки идут в чарты и обогащаются Discogs,
  откуда берётся обложка (раньше now_playing писался напрямую, мимо чартов)
- обложка now-playing: если источник не дал (ICY всегда null) — подставляем
  обложку обогащённого трека из нашей БД по normKey (NowPlayingService.resolveCover)
- ротация курсора по всем станциям (окно 70) вместо первых 50 по кругу —
  раньше 363 из 413 станций не опрашивались
- общий NowPlayingService.ingest для Record и ICY (дедуп логики)

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
nk
2026-06-03 14:08:07 +03:00
parent 149421740f
commit f379110975
2 changed files with 115 additions and 85 deletions

View File

@@ -1,26 +1,42 @@
import { Injectable, Logger } from '@nestjs/common'; import { Injectable, Logger } from '@nestjs/common';
import { Interval } from '@nestjs/schedule'; import { Interval } from '@nestjs/schedule';
import { PrismaService } from '../prisma/prisma.service'; import { PrismaService } from '../prisma/prisma.service';
import { NowPlayingGateway } from './now-playing.gateway'; import { NowPlayingService } from './now-playing.service';
import * as http from 'http'; import * as http from 'http';
import * as https from 'https'; import * as https from 'https';
/**
* Сбор now-playing для не-Record станций (DFM и др.) через ICY-метаданные потока.
* Станций много (сотни), поэтому за один тик опрашиваем окно и сдвигаем курсор —
* за несколько минут проходим все по кругу. Обложку и зачёт в чарты/обогащение
* берёт на себя NowPlayingService.ingest (обложка подтянется из нашей БД).
*/
@Injectable() @Injectable()
export class IcyNowPlayingService { export class IcyNowPlayingService {
private readonly logger = new Logger(IcyNowPlayingService.name); private readonly logger = new Logger(IcyNowPlayingService.name);
private cursor = 0;
private readonly windowSize = 70;
constructor( constructor(
private readonly prisma: PrismaService, private readonly prisma: PrismaService,
private readonly gateway: NowPlayingGateway, private readonly nowPlayingService: NowPlayingService,
) {} ) {}
@Interval(60000) @Interval(60000)
async pollIcyNowPlaying() { async pollIcyNowPlaying() {
this.logger.log('Starting ICY now playing poll...'); const where = { recordStationId: null, isOnline: true };
const total = await this.prisma.station.count({ where });
if (total === 0) return;
if (this.cursor >= total) this.cursor = 0;
const offset = this.cursor;
const stations = await this.prisma.station.findMany({ const stations = await this.prisma.station.findMany({
where: { recordStationId: null, isOnline: true }, where,
take: 50, orderBy: { stationId: 'asc' },
skip: offset,
take: this.windowSize,
}); });
this.cursor += this.windowSize;
let successCount = 0; let successCount = 0;
@@ -29,47 +45,26 @@ export class IcyNowPlayingService {
const results = await Promise.allSettled( const results = await Promise.allSettled(
batch.map(async (station) => { batch.map(async (station) => {
const track = await this.parseIcyMetadata(station.streamUrl); const track = await this.parseIcyMetadata(station.streamUrl);
if (!track) return null; if (!track || !track.artist || !track.song) return null;
const updated = await this.prisma.nowPlaying.upsert({ await this.nowPlayingService.ingest({
where: { stationId: station.id }, stationDbId: station.id,
create: { stationNumericId: station.stationId,
stationId: station.id,
song: track.song,
artist: track.artist,
coverUrl: null,
},
update: {
song: track.song,
artist: track.artist,
coverUrl: null,
},
});
this.gateway.broadcastNowPlaying(station.stationId.toString(), {
song: track.song,
artist: track.artist, artist: track.artist,
song: track.song,
coverUrl: null, coverUrl: null,
updatedAt: updated.updatedAt,
}); });
return track; return track;
}), }),
); );
for (let j = 0; j < results.length; j++) { for (const result of results) {
const result = results[j]; if (result.status === 'fulfilled' && result.value) successCount++;
if (result.status === 'fulfilled' && result.value) {
successCount++;
} else if (result.status === 'rejected') {
this.logger.warn(
`ICY failed for ${batch[j].name}: ${result.reason?.message || result.reason}`,
);
}
} }
} }
this.logger.log( this.logger.log(
`ICY poll complete: ${successCount}/${stations.length} stations updated`, `ICY poll: ${successCount}/${stations.length} updated (offset ${offset}/${total})`,
); );
} }

View File

@@ -56,49 +56,17 @@ export class NowPlayingService {
const mapping = this.recordSync.getStationByNowPlayingId(np.id); const mapping = this.recordSync.getStationByNowPlayingId(np.id);
if (!mapping) continue; if (!mapping) continue;
const coverUrl = np.track.image600 ?? np.track.image200 ?? np.track.image100; const coverUrl =
np.track.image600 ?? np.track.image200 ?? np.track.image100;
// Получаем текущее состояние до апдейта, чтобы определить смену трека await this.ingest({
const prev = await this.prisma.nowPlaying.findUnique({ stationDbId: mapping.dbId,
where: { stationId: mapping.dbId }, stationNumericId: mapping.stationId,
});
const updated = await this.prisma.nowPlaying.upsert({
where: { stationId: mapping.dbId },
create: {
stationId: mapping.dbId,
song: np.track.song,
artist: np.track.artist,
coverUrl,
},
update: {
song: np.track.song,
artist: np.track.artist,
coverUrl,
},
});
this.gateway.broadcastNowPlaying(mapping.stationId.toString(), {
song: np.track.song,
artist: np.track.artist, artist: np.track.artist,
song: np.track.song,
coverUrl, coverUrl,
updatedAt: updated.updatedAt,
}); });
updatedCount++; updatedCount++;
// Засчитываем проигрывание только при смене трека
const trackChanged =
!prev ||
prev.song !== np.track.song ||
prev.artist !== np.track.artist;
if (trackChanged) {
void this.chartsService.recordPlay({
artist: np.track.artist,
song: np.track.song,
coverUrl,
stationDbId: mapping.dbId,
});
}
} }
this.logger.log( this.logger.log(
@@ -113,6 +81,12 @@ export class NowPlayingService {
stationId: string, stationId: string,
data: { song: string; artist: string; coverUrl?: string }, data: { song: string; artist: string; coverUrl?: string },
) { ) {
const coverUrl = await this.resolveCover(
data.artist,
data.song,
data.coverUrl,
);
// Получаем текущее состояние до апдейта, чтобы определить смену трека // Получаем текущее состояние до апдейта, чтобы определить смену трека
const prev = await this.prisma.nowPlaying.findUnique({ const prev = await this.prisma.nowPlaying.findUnique({
where: { stationId }, where: { stationId },
@@ -120,23 +94,14 @@ export class NowPlayingService {
const nowPlaying = await this.prisma.nowPlaying.upsert({ const nowPlaying = await this.prisma.nowPlaying.upsert({
where: { stationId }, where: { stationId },
create: { create: { stationId, song: data.song, artist: data.artist, coverUrl },
stationId, update: { song: data.song, artist: data.artist, coverUrl },
song: data.song,
artist: data.artist,
coverUrl: data.coverUrl,
},
update: {
song: data.song,
artist: data.artist,
coverUrl: data.coverUrl,
},
}); });
this.gateway.broadcastNowPlaying(stationId, { this.gateway.broadcastNowPlaying(stationId, {
song: data.song, song: data.song,
artist: data.artist, artist: data.artist,
coverUrl: data.coverUrl, coverUrl,
updatedAt: nowPlaying.updatedAt, updatedAt: nowPlaying.updatedAt,
}); });
@@ -147,7 +112,7 @@ export class NowPlayingService {
void this.chartsService.recordPlay({ void this.chartsService.recordPlay({
artist: data.artist, artist: data.artist,
song: data.song, song: data.song,
coverUrl: data.coverUrl, coverUrl,
stationDbId: stationId, stationDbId: stationId,
}); });
} }
@@ -155,6 +120,76 @@ export class NowPlayingService {
return nowPlaying; return nowPlaying;
} }
/**
* Универсальный приём now-playing из любого источника (Record / ICY).
* Если источник не дал обложку — подставляем обложку обогащённого трека
* из нашей БД (по normKey). Обновляет now_playing, шлёт сокет, засчитывает
* проигрывание при смене трека (что запускает обогащение через Discogs).
*/
async ingest(params: {
stationDbId: string;
stationNumericId: number;
artist: string;
song: string;
coverUrl?: string | null;
}): Promise<void> {
const { stationDbId, stationNumericId, artist, song } = params;
const coverUrl = await this.resolveCover(artist, song, params.coverUrl);
const prev = await this.prisma.nowPlaying.findUnique({
where: { stationId: stationDbId },
});
const updated = await this.prisma.nowPlaying.upsert({
where: { stationId: stationDbId },
create: { stationId: stationDbId, song, artist, coverUrl },
update: { song, artist, coverUrl },
});
this.gateway.broadcastNowPlaying(stationNumericId.toString(), {
song,
artist,
coverUrl,
updatedAt: updated.updatedAt,
});
const trackChanged = !prev || prev.song !== song || prev.artist !== artist;
if (trackChanged) {
void this.chartsService.recordPlay({
artist,
song,
coverUrl,
stationDbId,
});
}
}
// Нормализованный ключ трека — совпадает с ChartsService.recordPlay
private buildNormKey(artist: string, song: string): string {
return (
artist.trim().toLowerCase().replace(/\s+/g, ' ') +
'|' +
song.trim().toLowerCase().replace(/\s+/g, ' ')
);
}
// Обложка: если источник дал — берём её, иначе обложку из обогащённого трека
private async resolveCover(
artist: string,
song: string,
provided?: string | null,
): Promise<string | null> {
if (provided) return provided;
const a = (artist ?? '').trim();
const s = (song ?? '').trim();
if (!a || !s) return null;
const track = await this.prisma.track.findUnique({
where: { normKey: this.buildNormKey(a, s) },
select: { coverUrl: true },
});
return track?.coverUrl ?? null;
}
async getNowPlaying(stationId: string) { async getNowPlaying(stationId: string) {
return this.prisma.nowPlaying.findUnique({ return this.prisma.nowPlaying.findUnique({
where: { stationId }, where: { stationId },