staff_data/apps/server/src/queue/stats/stats.service.ts

69 lines
2.0 KiB
TypeScript
Executable File

import { InjectQueue } from '@nestjs/bullmq';
import { Injectable } from '@nestjs/common';
import EventBus from '@server/utils/event-bus';
import { Queue } from 'bullmq';
import { ObjectType } from '@nice/common';
import { QueueJobType } from '../types';
@Injectable()
export class StatsService {
constructor(@InjectQueue('general') private generalQueue: Queue) {
EventBus.on('dataChanged', async ({ type, data }) => {
const jobOptions = {
removeOnComplete: true,
jobId: this.generateJobId(type as ObjectType, data), // 使用唯一ID防止重复任务
};
switch (type) {
case ObjectType.ENROLLMENT:
await this.generalQueue.add(
QueueJobType.UPDATE_STATS,
{
courseId: data.courseId,
type: ObjectType.ENROLLMENT,
},
jobOptions,
);
break;
case ObjectType.LECTURE:
await this.generalQueue.add(
QueueJobType.UPDATE_STATS,
{
sectionId: data.sectionId,
courseId: data.courseId,
type: ObjectType.LECTURE,
},
jobOptions,
);
break;
case ObjectType.POST:
if (data.courseId) {
await this.generalQueue.add(
QueueJobType.UPDATE_STATS,
{
courseId: data.courseId,
type: ObjectType.POST,
},
jobOptions,
);
}
break;
}
});
}
private generateJobId(type: ObjectType, data: any): string {
// 根据类型和相关ID生成唯一的job标识
switch (type) {
case ObjectType.ENROLLMENT:
return `stats_${type}_${data.courseId}`;
case ObjectType.LECTURE:
return `stats_${type}_${data.courseId}_${data.sectionId}`;
case ObjectType.POST:
return `stats_${type}_${data.courseId}`;
default:
return `stats_${type}_${Date.now()}`;
}
}
}