origin/apps/server/src/queue/worker/processor.ts

67 lines
2.0 KiB
TypeScript
Raw Normal View History

2024-08-02 19:48:38 +08:00
import { Job } from 'bullmq';
2024-12-30 08:26:40 +08:00
import { Logger } from '@nestjs/common';
2025-01-06 08:45:23 +08:00
import { ObjectType } from '@nice/common';
2025-02-06 16:32:52 +08:00
// import {
// updateCourseEnrollmentStats,
// updateCourseReviewStats,
// } from '@server/models/course/utils';
import { QueueJobType } from '../types';
2025-01-03 09:24:46 +08:00
import {
2025-01-27 22:43:31 +08:00
updateCourseEnrollmentStats,
updateCourseReviewStats,
2025-02-06 16:32:52 +08:00
updateParentLectureStats,
} from '@server/models/post/utils';
2025-02-25 08:25:54 +08:00
import {
updatePostViewCount,
updateTotalCourseViewCount,
} from '../models/post/utils';
2025-01-03 09:24:46 +08:00
const logger = new Logger('QueueWorker');
export default async function processJob(job: Job<any, any, QueueJobType>) {
2025-01-27 22:43:31 +08:00
try {
if (job.name === QueueJobType.UPDATE_STATS) {
const { sectionId, courseId, type } = job.data;
// 处理 section 统计
if (sectionId) {
2025-02-06 16:32:52 +08:00
await updateParentLectureStats(sectionId);
2025-01-27 22:43:31 +08:00
logger.debug(`Updated section stats for sectionId: ${sectionId}`);
}
// 如果没有 courseId提前返回
if (!courseId) {
return;
}
// 处理 course 相关统计
switch (type) {
case ObjectType.LECTURE:
2025-02-06 16:32:52 +08:00
await updateParentLectureStats(courseId);
2025-01-27 22:43:31 +08:00
break;
case ObjectType.ENROLLMENT:
await updateCourseEnrollmentStats(courseId);
break;
case ObjectType.POST:
await updateCourseReviewStats(courseId);
break;
default:
logger.warn(`Unknown update stats type: ${type}`);
}
2024-08-02 19:48:38 +08:00
2025-01-27 22:43:31 +08:00
logger.debug(
`Updated course stats for courseId: ${courseId}, type: ${type}`,
);
2024-12-30 08:26:40 +08:00
}
2025-02-24 09:32:59 +08:00
if (job.name === QueueJobType.UPDATE_POST_VISIT_COUNT) {
await updatePostViewCount(job.data.id, job.data.type);
}
if (job.name === QueueJobType.UPDATE_POST_STATE) {
await updatePostViewCount(job.data.id, job.data.type);
}
2025-02-25 08:25:54 +08:00
if (job.name === QueueJobType.UPDATE_TOTAL_COURSE_VIEW_COUNT) {
await updateTotalCourseViewCount(job.data.type);
}
2025-01-27 22:43:31 +08:00
} catch (error: any) {
logger.error(
`Error processing stats update job: ${error.message}`,
error.stack,
);
}
}