origin/apps/server/src/models/resource/pipe/resource.pipeline.ts

86 lines
2.4 KiB
TypeScript

import { PrismaClient, Resource } from '@prisma/client';
import { ProcessResult, ResourceProcessor } from '../types';
import { db, ResourceStatus } from '@nice/common';
import { Logger } from '@nestjs/common';
// Pipeline 类
export class ResourceProcessingPipeline {
private processors: ResourceProcessor[] = [];
private logger = new Logger(ResourceProcessingPipeline.name);
constructor() {}
// 添加处理器
addProcessor(processor: ResourceProcessor): ResourceProcessingPipeline {
this.processors.push(processor);
return this;
}
// 执行处理管道
async execute(resource: Resource): Promise<ProcessResult> {
let currentResource = resource;
try {
this.logger.log(`开始处理资源: ${resource.id}`);
currentResource = await this.updateProcessStatus(
resource.id,
ResourceStatus.PROCESSING,
);
this.logger.log(`资源状态已更新为处理中`);
for (const processor of this.processors) {
const processorName = processor.constructor.name;
this.logger.log(`开始执行处理器: ${processorName}`);
currentResource = await this.updateProcessStatus(
currentResource.id,
processor.constructor.name as ResourceStatus,
);
currentResource = await processor.process(currentResource);
this.logger.log(`处理器 ${processorName} 执行完成`);
currentResource = await db.resource.update({
where: { id: currentResource.id },
data: currentResource,
});
}
currentResource = await this.updateProcessStatus(
currentResource.id,
ResourceStatus.PROCESSED,
);
this.logger.log(
`资源 ${resource.id} 处理成功 ${JSON.stringify(currentResource.meta)}`,
);
return {
success: true,
resource: currentResource,
};
} catch (error) {
this.logger.error(`资源 ${resource.id} 处理失败:`, error);
currentResource = await this.updateProcessStatus(
currentResource.id,
ResourceStatus.PROCESS_FAILED,
);
return {
success: false,
resource: currentResource,
error: error as Error,
};
}
}
private async updateProcessStatus(
resourceId: string,
status: ResourceStatus,
): Promise<Resource> {
return db.resource.update({
where: { id: resourceId },
data: { status },
});
}
}