feat(stations): корректный health-check + эндпоинт offline-ids
health-check переписан: живой = пришли заголовки 200-399 (рвём соединение сразу, не ждём бесконечное тело аудиопотока), параллельно, прогон при старте + ежечасно. Раньше GET висел на живых потоках до таймаута → ложный offline. Новый GET /stations/offline-ids отдаёт station_id оффлайн-станций — клиент их скрывает. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
@@ -1,72 +1,104 @@
|
||||
import { Injectable, Logger } from '@nestjs/common';
|
||||
import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
|
||||
import { Cron, CronExpression } from '@nestjs/schedule';
|
||||
import { PrismaService } from '../prisma/prisma.service';
|
||||
import * as http from 'http';
|
||||
import * as https from 'https';
|
||||
|
||||
@Injectable()
|
||||
export class HealthCheckService {
|
||||
export class HealthCheckService implements OnModuleInit {
|
||||
private readonly logger = new Logger(HealthCheckService.name);
|
||||
|
||||
constructor(private readonly prisma: PrismaService) {}
|
||||
|
||||
// Один прогон вскоре после старта, чтобы isOnline был актуален после деплоя
|
||||
async onModuleInit() {
|
||||
setTimeout(() => {
|
||||
void this.checkAllStations();
|
||||
}, 15000);
|
||||
}
|
||||
|
||||
@Cron(CronExpression.EVERY_HOUR)
|
||||
async checkAllStations() {
|
||||
this.logger.log('Starting hourly station health check...');
|
||||
const stations = await this.prisma.station.findMany();
|
||||
let onlineCount = 0;
|
||||
let offlineCount = 0;
|
||||
this.logger.log('Проверка доступности станций...');
|
||||
const stations = await this.prisma.station.findMany({
|
||||
select: { id: true, streamUrl: true, isOnline: true },
|
||||
});
|
||||
|
||||
for (const station of stations) {
|
||||
try {
|
||||
const isOnline = await this.checkStation(station.streamUrl);
|
||||
await this.prisma.station.update({
|
||||
where: { id: station.id },
|
||||
data: { isOnline, lastCheckAt: new Date() },
|
||||
});
|
||||
if (isOnline) onlineCount++;
|
||||
else offlineCount++;
|
||||
} catch (error) {
|
||||
this.logger.warn(
|
||||
`Failed to check station ${station.name}: ${error.message}`,
|
||||
);
|
||||
await this.prisma.station.update({
|
||||
where: { id: station.id },
|
||||
data: { isOnline: false, lastCheckAt: new Date() },
|
||||
});
|
||||
offlineCount++;
|
||||
}
|
||||
let online = 0;
|
||||
let offline = 0;
|
||||
const CONC = 24;
|
||||
|
||||
for (let i = 0; i < stations.length; i += CONC) {
|
||||
const batch = stations.slice(i, i + CONC);
|
||||
await Promise.all(
|
||||
batch.map(async (s) => {
|
||||
const isOnline = await this.isAlive(s.streamUrl);
|
||||
if (isOnline) online++;
|
||||
else offline++;
|
||||
// Пишем только при изменении статуса — меньше нагрузка на БД
|
||||
if (isOnline !== s.isOnline) {
|
||||
await this.prisma.station
|
||||
.update({
|
||||
where: { id: s.id },
|
||||
data: { isOnline, lastCheckAt: new Date() },
|
||||
})
|
||||
.catch(() => undefined);
|
||||
}
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
this.logger.log(
|
||||
`Health check complete. Online: ${onlineCount}, Offline: ${offlineCount}`,
|
||||
);
|
||||
this.logger.log(`Проверка завершена. Online: ${online}, Offline: ${offline}`);
|
||||
}
|
||||
|
||||
private async checkStation(url: string): Promise<boolean> {
|
||||
const controller = new AbortController();
|
||||
const timeout = setTimeout(() => controller.abort(), 10000);
|
||||
|
||||
try {
|
||||
const response = await fetch(url, {
|
||||
method: 'HEAD',
|
||||
signal: controller.signal,
|
||||
});
|
||||
clearTimeout(timeout);
|
||||
return response.status >= 200 && response.status < 400;
|
||||
} catch {
|
||||
clearTimeout(timeout);
|
||||
// Fallback to GET if HEAD fails
|
||||
try {
|
||||
const controller2 = new AbortController();
|
||||
const timeout2 = setTimeout(() => controller2.abort(), 10000);
|
||||
const response = await fetch(url, {
|
||||
method: 'GET',
|
||||
signal: controller2.signal,
|
||||
});
|
||||
clearTimeout(timeout2);
|
||||
return response.status >= 200 && response.status < 400;
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
/**
|
||||
* Живость потока: живой = пришли заголовки со статусом 200–399.
|
||||
* Аудиопоток отдаёт тело бесконечно, поэтому сразу после заголовков рвём
|
||||
* соединение (req.destroy). Ошибка/4xx/5xx/таймаут = мёртв. 2 попытки.
|
||||
*/
|
||||
private async isAlive(url: string): Promise<boolean> {
|
||||
for (let attempt = 0; attempt < 2; attempt++) {
|
||||
if (await this.probe(url)) return true;
|
||||
await this.sleep(300);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private probe(url: string): Promise<boolean> {
|
||||
return new Promise((resolve) => {
|
||||
let done = false;
|
||||
const finish = (v: boolean) => {
|
||||
if (!done) {
|
||||
done = true;
|
||||
resolve(v);
|
||||
}
|
||||
};
|
||||
try {
|
||||
const lib = url.startsWith('https') ? https : http;
|
||||
const req = lib.get(
|
||||
url,
|
||||
{
|
||||
timeout: 8000,
|
||||
headers: { 'User-Agent': 'Mozilla/5.0', 'Icy-MetaData': '1' },
|
||||
},
|
||||
(res) => {
|
||||
const code = res.statusCode ?? 0;
|
||||
req.destroy();
|
||||
finish(code >= 200 && code < 400);
|
||||
},
|
||||
);
|
||||
req.on('error', () => finish(false));
|
||||
req.on('timeout', () => {
|
||||
req.destroy();
|
||||
finish(false);
|
||||
});
|
||||
} catch {
|
||||
finish(false);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private sleep(ms: number): Promise<void> {
|
||||
return new Promise((r) => setTimeout(r, ms));
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user