import { Injectable, Logger } from '@nestjs/common'; import { Queue, Job } from 'bullmq'; // 添加 Job 导入 import { InjectQueue } from '@nestjs/bullmq'; import { db, Resource, ResourceType } from '@nicestack/common'; import { HookRequest, HookResponse } from './types'; import dayjs from 'dayjs'; import { getFilenameWithoutExt } from '@server/utils/file'; import { QueueJobType } from '@server/queue/types'; import { nanoid } from 'nanoid'; import { slugify } from 'transliteration'; import path from 'path'; import fs from 'node:fs'; @Injectable() export class UploadService { private readonly logger = new Logger(UploadService.name); constructor( @InjectQueue('file-queue') private fileQueue: Queue ) { } async handlePreCreateHook(hookRequest: HookRequest): Promise { const hookResponse: HookResponse = { HTTPResponse: { Headers: {} } }; const metaData = hookRequest.Event.Upload.MetaData; const isValid = metaData && 'filename' in metaData; if (!isValid) { hookResponse.RejectUpload = true; hookResponse.HTTPResponse.StatusCode = 400; hookResponse.HTTPResponse.Body = 'no filename provided'; hookResponse.HTTPResponse.Headers['X-Some-Header'] = 'yes'; } else { const timestamp = dayjs().format('YYMMDDHHmmss'); const originalName = metaData.filename; const extension = path.extname(originalName); // 获取文件扩展名 // 清理并转换文件名(不包含扩展名) const cleanName = slugify(getFilenameWithoutExt(originalName), { lowercase: true, separator: '-', trim: true }) .replace(/[^\w-]/g, '') .replace(/-+/g, '-') .substring(0, 32); const uniqueId = nanoid(6); const fileId = `${timestamp}_${cleanName}_${uniqueId}`; // await fs.promises.mkdir(path.join(process.env.UPLOAD_DIR, fileId), { recursive: true }); hookResponse.ChangeFileInfo = { ID: `${fileId}/${fileId}${extension}`, MetaData: { filename: originalName, normalized_name: cleanName, folder: fileId } }; } return hookResponse; } async handlePostFinishHook(hookRequest: HookRequest) { const { ID, Size, Storage, MetaData } = hookRequest.Event.Upload; const filename = MetaData?.filename; const resource = await db.resource.create({ data: { filename, fileId: ID, title: getFilenameWithoutExt(filename), // 使用没有扩展名的标题 metadata: MetaData || {} } }) await this.addToProcessorPipe(resource) this.logger.log(`Upload ${ID} (${Size} bytes) is finished.`); } async handleTusHook(hookRequest: HookRequest): Promise { const hookResponse: HookResponse = { HTTPResponse: { Headers: {} } }; try { if (hookRequest.Type === 'pre-create') { return this.handlePreCreateHook(hookRequest); } if (hookRequest.Type === 'post-finish') { this.handlePostFinishHook(hookRequest); } return hookResponse; } catch (error: any) { this.logger.error(`Error handling hook: ${error.message}`); throw error; } } async addToProcessorPipe(resource: Resource): Promise { // 修改返回类型为 Job const job = await this.fileQueue.add(QueueJobType.FILE_PROCESS, { resource, timestamp: Date.now() }, { attempts: 3, removeOnComplete: true, jobId: resource.id }); return job; } }