import { PrismaClient, Resource } from '@prisma/client' import { ProcessResult, ResourceProcessor } from '../types' import { db, ResourceStatus } from '@nice/common' import { Logger } from '@nestjs/common'; // Pipeline 类 export class ResourceProcessingPipeline { private processors: ResourceProcessor[] = [] private logger = new Logger(ResourceProcessingPipeline.name); constructor() { } // 添加处理器 addProcessor(processor: ResourceProcessor): ResourceProcessingPipeline { this.processors.push(processor) return this } // 执行处理管道 async execute(resource: Resource): Promise { let currentResource = resource try { this.logger.log(`开始处理资源: ${resource.id}`) currentResource = await this.updateProcessStatus( resource.id, ResourceStatus.PROCESSING ) this.logger.log(`资源状态已更新为处理中`) for (const processor of this.processors) { const processorName = processor.constructor.name this.logger.log(`开始执行处理器: ${processorName}`) currentResource = await this.updateProcessStatus( currentResource.id, processor.constructor.name as ResourceStatus ) currentResource = await processor.process(currentResource) this.logger.log(`处理器 ${processorName} 执行完成`) currentResource = await db.resource.update({ where: { id: currentResource.id }, data: currentResource }) } currentResource = await this.updateProcessStatus( currentResource.id, ResourceStatus.PROCESSED ) this.logger.log(`资源 ${resource.id} 处理成功 ${JSON.stringify(currentResource.metadata)}`) return { success: true, resource: currentResource } } catch (error) { this.logger.error(`资源 ${resource.id} 处理失败:`, error) currentResource = await this.updateProcessStatus( currentResource.id, ResourceStatus.PROCESS_FAILED ) return { success: false, resource: currentResource, error: error as Error } } } private async updateProcessStatus( resourceId: string, status: ResourceStatus ): Promise { return db.resource.update({ where: { id: resourceId }, data: { status } }) } }