doctor-mail/apps/server/src/queue/stats/stats.service.ts

70 lines
2.5 KiB
TypeScript
Raw Normal View History

2025-01-03 09:24:46 +08:00
import { InjectQueue } from "@nestjs/bullmq";
import { Injectable } from "@nestjs/common";
import EventBus from "@server/utils/event-bus";
import { Queue } from "bullmq";
2025-01-06 08:45:23 +08:00
import { ObjectType } from "@nice/common";
2025-01-03 09:24:46 +08:00
import { QueueJobType } from "../types";
@Injectable()
export class StatsService {
constructor(
@InjectQueue('general') private generalQueue: Queue
) {
EventBus.on("dataChanged", async ({ type, data }) => {
const jobOptions = {
removeOnComplete: true,
jobId: this.generateJobId(type as ObjectType, data) // 使用唯一ID防止重复任务
};
switch (type) {
case ObjectType.ENROLLMENT:
await this.generalQueue.add(
QueueJobType.UPDATE_STATS,
{
courseId: data.courseId,
type: ObjectType.ENROLLMENT
},
jobOptions
);
break;
case ObjectType.LECTURE:
await this.generalQueue.add(
QueueJobType.UPDATE_STATS,
{
sectionId: data.sectionId,
courseId: data.courseId,
type: ObjectType.LECTURE
},
jobOptions
);
break;
case ObjectType.POST:
if (data.courseId) {
await this.generalQueue.add(
QueueJobType.UPDATE_STATS,
{
courseId: data.courseId,
type: ObjectType.POST
},
jobOptions
);
}
break;
}
});
}
private generateJobId(type: ObjectType, data: any): string {
// 根据类型和相关ID生成唯一的job标识
switch (type) {
case ObjectType.ENROLLMENT:
return `stats_${type}_${data.courseId}`;
case ObjectType.LECTURE:
return `stats_${type}_${data.courseId}_${data.sectionId}`;
case ObjectType.POST:
return `stats_${type}_${data.courseId}`;
default:
return `stats_${type}_${Date.now()}`;
}
}
}