From eca128de5fd1af26913ba4bd2ada282ac0f05058 Mon Sep 17 00:00:00 2001 From: longdayi <13477510+longdayilongdayi@user.noreply.gitee.com> Date: Mon, 6 Jan 2025 18:30:16 +0800 Subject: [PATCH] 01061830 --- .continue/prompts/comment.prompt | 1 - .continue/prompts/explain.prompt | 24 +- apps/server/package.json | 2 +- .../models/resource/pipe/resource.pipeline.ts | 14 +- .../resource/processor/GeneralProcessor.ts | 52 ---- .../resource/processor/ImageProcessor.ts | 4 +- .../src/models/resource/resource.module.ts | 3 +- .../src/models/resource/resource.service.ts | 13 +- apps/server/src/models/resource/types.ts | 1 - .../server/src/queue/worker/file.processor.ts | 6 - apps/server/src/upload/chunk.manager.ts | 135 --------- apps/server/src/upload/clean.service.ts | 52 ---- apps/server/src/upload/tus.service.ts | 104 +++++++ apps/server/src/upload/upload-lock.service.ts | 88 ------ apps/server/src/upload/upload.controller.ts | 141 ++++++--- apps/server/src/upload/upload.module.ts | 8 +- apps/server/src/upload/upload.service.ts | 285 ------------------ apps/web/src/app/main/home/page.tsx | 131 ++++---- packages/client/src/tools/index.ts | 3 +- packages/common/prisma/schema.prisma | 46 ++- packages/common/src/enum.ts | 10 +- packages/tus/src/handlers/BaseHandler.ts | 7 - packages/tus/src/handlers/GetHandler.ts | 2 +- packages/tus/src/handlers/PatchHandler.ts | 24 +- packages/tus/src/handlers/PostHandler.ts | 29 +- packages/tus/src/index.ts | 1 + packages/tus/src/server.ts | 40 +-- packages/tus/src/utils/constants.ts | 1 + packages/tus/src/utils/models/Context.ts | 47 --- packages/tus/src/utils/models/DataStore.ts | 2 +- packages/tus/src/utils/models/Locker.ts | 20 +- .../tus/src/utils/models/StreamSplitter.ts | 7 +- packages/tus/src/utils/models/Upload.ts | 61 +++- pnpm-lock.yaml | 21 +- 34 files changed, 452 insertions(+), 933 deletions(-) delete mode 100644 apps/server/src/models/resource/processor/GeneralProcessor.ts delete mode 100644 apps/server/src/upload/chunk.manager.ts delete mode 100644 apps/server/src/upload/clean.service.ts create mode 100644 apps/server/src/upload/tus.service.ts delete mode 100644 apps/server/src/upload/upload-lock.service.ts delete mode 100644 apps/server/src/upload/upload.service.ts diff --git a/.continue/prompts/comment.prompt b/.continue/prompts/comment.prompt index f556eb8..68cb798 100644 --- a/.continue/prompts/comment.prompt +++ b/.continue/prompts/comment.prompt @@ -5,7 +5,6 @@ maxTokens: 8192 角色定位: - 高级软件开发工程师 - 代码文档化与知识传播专家 - 注释目标: 1. 顶部注释 - 模块/文件整体功能描述 diff --git a/.continue/prompts/explain.prompt b/.continue/prompts/explain.prompt index fcce931..0cfb31e 100644 --- a/.continue/prompts/explain.prompt +++ b/.continue/prompts/explain.prompt @@ -2,27 +2,5 @@ temperature: 0.5 maxTokens: 8192 --- -角色定位: -- 身份: 高级软件开发工程师 -- 专业能力: 深入代码架构分析 -- 分析维度: 技术、设计、性能、最佳实践 - -分析要求: -1. 代码逐行详细注释 -2. 注释必须包含: - - 代码意图解析 - - 技术原理阐述 - - 数据结构解读 - -输出规范: -- 全中文专业技术文档注释 -- 注释风格: 标准文档型 -- 保留原代码结构 -- 注释与代码同步展示 -- 技术性、专业性并重 - -禁止: -- 不返回无关说明 -- 不进行无意义的介绍 -- 严格遵循技术分析本身 +你的任务是基于专业的计算机知识背景剖析代码原理,逐行进行详细分析,充分解释代码意图,并对代码的数据结构,算法或编码方式等进行深度剖析和举例说明,所有分析以中文标准文档型注释的形式插入原代码,除了返回带有分析的代码外,不要返回任何信息. \ No newline at end of file diff --git a/apps/server/package.json b/apps/server/package.json index 84b8e99..b8c2cdb 100755 --- a/apps/server/package.json +++ b/apps/server/package.json @@ -30,10 +30,10 @@ "@nestjs/schedule": "^4.1.0", "@nestjs/websockets": "^10.3.10", "@nice/common": "workspace:*", + "@nice/tus": "workspace:*", "@trpc/server": "11.0.0-rc.456", "@tus/file-store": "^1.5.1", "@tus/s3-store": "^1.6.2", - "@tus/server": "^1.10.0", "argon2": "^0.41.1", "axios": "^1.7.2", "bullmq": "^5.12.0", diff --git a/apps/server/src/models/resource/pipe/resource.pipeline.ts b/apps/server/src/models/resource/pipe/resource.pipeline.ts index 4a1ce4c..ce132da 100644 --- a/apps/server/src/models/resource/pipe/resource.pipeline.ts +++ b/apps/server/src/models/resource/pipe/resource.pipeline.ts @@ -1,6 +1,6 @@ import { PrismaClient, Resource } from '@prisma/client' import { ProcessResult, ResourceProcessor } from '../types' -import { db, ResourceProcessStatus } from '@nice/common' +import { db, ResourceStatus } from '@nice/common' import { Logger } from '@nestjs/common'; @@ -25,7 +25,7 @@ export class ResourceProcessingPipeline { currentResource = await this.updateProcessStatus( resource.id, - ResourceProcessStatus.PROCESSING + ResourceStatus.PROCESSING ) this.logger.log(`资源状态已更新为处理中`) @@ -35,7 +35,7 @@ export class ResourceProcessingPipeline { currentResource = await this.updateProcessStatus( currentResource.id, - processor.constructor.name as ResourceProcessStatus + processor.constructor.name as ResourceStatus ) currentResource = await processor.process(currentResource) @@ -49,7 +49,7 @@ export class ResourceProcessingPipeline { currentResource = await this.updateProcessStatus( currentResource.id, - ResourceProcessStatus.SUCCESS + ResourceStatus.PROCESSED ) this.logger.log(`资源 ${resource.id} 处理成功 ${JSON.stringify(currentResource.metadata)}`) @@ -62,7 +62,7 @@ export class ResourceProcessingPipeline { currentResource = await this.updateProcessStatus( currentResource.id, - ResourceProcessStatus.FAILED + ResourceStatus.PROCESS_FAILED ) return { @@ -74,11 +74,11 @@ export class ResourceProcessingPipeline { } private async updateProcessStatus( resourceId: string, - processStatus: ResourceProcessStatus + status: ResourceStatus ): Promise { return db.resource.update({ where: { id: resourceId }, - data: { processStatus } + data: { status } }) } } diff --git a/apps/server/src/models/resource/processor/GeneralProcessor.ts b/apps/server/src/models/resource/processor/GeneralProcessor.ts deleted file mode 100644 index be2f731..0000000 --- a/apps/server/src/models/resource/processor/GeneralProcessor.ts +++ /dev/null @@ -1,52 +0,0 @@ -import { BaseMetadata, FileMetadata, ResourceProcessor } from "../types"; -import { Resource, db, ResourceProcessStatus } from "@nice/common"; -import { extname } from "path"; -import mime from "mime"; -import { calculateFileHash, getUploadFilePath } from "@server/utils/file"; -import { Logger } from "@nestjs/common"; -import { statSync } from "fs"; // Add this import -export class GeneralProcessor implements ResourceProcessor { - private logger = new Logger(GeneralProcessor.name); - - async process(resource: Resource): Promise { - const originMeta = resource.metadata as any; - try { - // 提取文件扩展名作为type - const fileExtension = extname(resource.filename).toLowerCase().slice(1); - this.logger.debug(`文件扩展名: ${fileExtension}`); - - // 获取文件路径并验证 - const filePath = getUploadFilePath(resource.fileId); - this.logger.debug(`文件路径: ${filePath}`); - const fileStats = statSync(filePath); - const fileSize = fileStats.size; - const modifiedAt = fileStats.mtime; - const createdAt = fileStats.birthtime; - - const fileHash = await calculateFileHash(filePath); - // 准备metadata - const metadata: BaseMetadata = { - filename: resource.filename, - extension: fileExtension, - mimeType: mime.lookup(fileExtension) || 'application/octet-stream', - size: fileSize, - modifiedAt, - createdAt, - }; - - const updatedResource = await db.resource.update({ - where: { id: resource.id }, - data: { - type: fileExtension, - hash: fileHash, - metadata: { ...originMeta, ...metadata } as any, - } - }); - - return updatedResource; - - } catch (error: any) { - throw new Error(`Resource processing failed: ${error.message}`); - } - } -} \ No newline at end of file diff --git a/apps/server/src/models/resource/processor/ImageProcessor.ts b/apps/server/src/models/resource/processor/ImageProcessor.ts index 4cd99c1..05b9041 100644 --- a/apps/server/src/models/resource/processor/ImageProcessor.ts +++ b/apps/server/src/models/resource/processor/ImageProcessor.ts @@ -1,10 +1,10 @@ import path from "path"; import sharp from 'sharp'; import { FileMetadata, ImageMetadata, ResourceProcessor } from "../types"; -import { Resource, ResourceProcessStatus, db } from "@nice/common"; +import { Resource, ResourceStatus, db } from "@nice/common"; import { getUploadFilePath } from "@server/utils/file"; import { Logger } from "@nestjs/common"; -import { promises as fsPromises } from 'fs'; + export class ImageProcessor implements ResourceProcessor { private logger = new Logger(ImageProcessor.name) constructor() { } diff --git a/apps/server/src/models/resource/resource.module.ts b/apps/server/src/models/resource/resource.module.ts index 7d0f578..153bc6e 100644 --- a/apps/server/src/models/resource/resource.module.ts +++ b/apps/server/src/models/resource/resource.module.ts @@ -1,9 +1,10 @@ import { Module } from '@nestjs/common'; import { ResourceRouter } from './resource.router'; import { ResourceService } from './resource.service'; +import { TrpcService } from '@server/trpc/trpc.service'; @Module({ exports: [ResourceRouter, ResourceService], - providers: [ResourceRouter, ResourceService], + providers: [ResourceRouter, ResourceService, TrpcService], }) export class ResourceModule { } diff --git a/apps/server/src/models/resource/resource.service.ts b/apps/server/src/models/resource/resource.service.ts index a64529f..9671d85 100644 --- a/apps/server/src/models/resource/resource.service.ts +++ b/apps/server/src/models/resource/resource.service.ts @@ -6,6 +6,7 @@ import { ObjectType, Prisma, Resource, + ResourceStatus, } from '@nice/common'; @Injectable() @@ -20,7 +21,7 @@ export class ResourceService extends BaseService { if (params?.staff) { args.data.ownerId = params?.staff?.id } - return this.create(args); + return super.create(args); } async checkFileExists(hash: string): Promise { return this.findFirst({ @@ -30,4 +31,14 @@ export class ResourceService extends BaseService { }, }); } + async softDeleteByFileId(fileId: string) { + return this.update({ + where: { + fileId, + }, + data: { + deletedAt: new Date(), + }, + }); + } } \ No newline at end of file diff --git a/apps/server/src/models/resource/types.ts b/apps/server/src/models/resource/types.ts index 12441a6..b766681 100644 --- a/apps/server/src/models/resource/types.ts +++ b/apps/server/src/models/resource/types.ts @@ -15,7 +15,6 @@ export interface BaseMetadata { filename: string extension: string modifiedAt: Date - createdAt: Date } /** * 图片特有元数据接口 diff --git a/apps/server/src/queue/worker/file.processor.ts b/apps/server/src/queue/worker/file.processor.ts index b1d1374..fe4398b 100644 --- a/apps/server/src/queue/worker/file.processor.ts +++ b/apps/server/src/queue/worker/file.processor.ts @@ -2,13 +2,9 @@ import { Job } from 'bullmq'; import { Logger } from '@nestjs/common'; import { QueueJobType } from '../types'; import { ResourceProcessingPipeline } from '@server/models/resource/pipe/resource.pipeline'; -import { GeneralProcessor } from '@server/models/resource/processor/GeneralProcessor'; import { ImageProcessor } from '@server/models/resource/processor/ImageProcessor'; -import superjson from 'superjson-cjs'; -import { Resource } from '@nice/common'; const logger = new Logger('FileProcessorWorker'); const pipeline = new ResourceProcessingPipeline() - .addProcessor(new GeneralProcessor()) .addProcessor(new ImageProcessor()) export default async function processJob(job: Job) { if (job.name === QueueJobType.FILE_PROCESS) { @@ -17,8 +13,6 @@ export default async function processJob(job: Job) { if (!resource) { throw new Error('No resource provided in job data'); } - - const result = await pipeline.execute(resource); return result; diff --git a/apps/server/src/upload/chunk.manager.ts b/apps/server/src/upload/chunk.manager.ts deleted file mode 100644 index ae41ec6..0000000 --- a/apps/server/src/upload/chunk.manager.ts +++ /dev/null @@ -1,135 +0,0 @@ -import * as fs from 'fs/promises'; -import * as path from 'path'; -import { Injectable, Logger } from '@nestjs/common'; -import { ChunkDto } from '@nice/common'; - -@Injectable() -export class ChunkManager { - private readonly logger = new Logger(ChunkManager.name); - - private readonly CHUNK_PROCESSING_CONCURRENCY = 3; - private readonly uploadDir: string; - private readonly tempDir: string; - - constructor() { - this.uploadDir = process.env.UPLOAD_DIR || path.join(process.cwd(), 'uploads'); - this.tempDir = path.join(this.uploadDir, 'temp'); - this.logger.localInstance.setLogLevels(["log", "debug"]) - this.logger.log(`Initialized ChunkManager with uploadDir: ${this.uploadDir}, tempDir: ${this.tempDir}`); - } - - private validateChunk(chunk: ChunkDto, file: Express.Multer.File): void { - this.logger.debug(`Validating chunk: identifier=${chunk?.identifier}, chunkNumber=${chunk?.chunkNumber}, bufferLength=${file?.buffer?.length}`); - - if (!chunk?.identifier || !chunk.chunkNumber || !file?.buffer?.length) { - this.logger.warn('Invalid chunk metadata or buffer'); - throw new Error('Invalid chunk metadata or buffer'); - } - } - - async save(chunk: ChunkDto, file: Express.Multer.File): Promise { - this.logger.log(`Saving chunk: identifier=${chunk.identifier}, chunkNumber=${chunk.chunkNumber}`); - this.validateChunk(chunk, file); - const chunkDir = path.join(this.tempDir, chunk.identifier); - const chunkPath = path.join(chunkDir, `${chunk.chunkNumber}`); - try { - this.logger.debug(`Creating chunk directory: ${chunkDir}`); - await fs.mkdir(chunkDir, { recursive: true, mode: 0o755 }); - - this.logger.debug(`Writing chunk file: ${chunkPath}`); - await fs.writeFile(chunkPath, file.buffer, { mode: 0o644 }); - - this.logger.log(`Chunk saved successfully: ${chunkPath}`); - } catch (error) { - this.logger.error(`Chunk save failed: ${error instanceof Error ? error.message : error}`); - throw error; - } - } - - async getChunks(identifier: string): Promise { - this.logger.log(`Retrieving chunks for identifier: ${identifier}`); - if (!identifier) { - this.logger.warn('No identifier provided'); - return []; - } - try { - const chunkPath = path.join(this.tempDir, identifier); - this.logger.debug(`Reading directory: ${chunkPath}`); - - const files = await fs.readdir(chunkPath); - const chunks = files - .map(Number) - .filter(num => !isNaN(num) && num > 0) - .sort((a, b) => a - b); - - this.logger.log(`Found chunks: ${chunks.join(', ')}`); - return chunks; - } catch (error) { - this.logger.warn(`Chunk retrieval failed: ${error}`); - return []; - } - } - - async merge(chunkDir: string, finalPath: string, totalChunks: number): Promise { - this.logger.log(`Merging chunks: chunkDir=${chunkDir}, finalPath=${finalPath}, totalChunks=${totalChunks}`); - - if (!chunkDir || !finalPath || totalChunks <= 0) { - this.logger.warn('Invalid merge parameters'); - throw new Error('Invalid merge parameters'); - } - - try { - this.logger.debug(`Opening final file: ${finalPath}`); - const fileHandle = await fs.open(finalPath, 'w'); - - try { - for (let i = 0; i < totalChunks; i += this.CHUNK_PROCESSING_CONCURRENCY) { - const batch = Array.from( - { length: Math.min(this.CHUNK_PROCESSING_CONCURRENCY, totalChunks - i) }, - (_, j) => i + j + 1 - ); - this.logger.debug(`Processing batch: ${batch.join(', ')}`); - const chunkBuffers = await Promise.all( - batch.map(chunkNumber => { - const chunkPath = path.join(chunkDir, `${chunkNumber}`); - this.logger.debug(`Reading chunk: ${chunkPath}`); - return fs.readFile(chunkPath); - }) - ); - for (const chunkBuffer of chunkBuffers) { - await fileHandle.write(chunkBuffer, 0, chunkBuffer.length); - } - } - } finally { - this.logger.debug(`Closing file handle: ${finalPath}`); - await fileHandle.close(); - } - - this.logger.log(`Merge completed successfully: ${finalPath}`); - } catch (error) { - this.logger.error(`Merge failed: ${error}`); - throw error; - } - } - - async cleanup(identifier: string): Promise { - this.logger.log(`Cleaning up chunks for identifier: ${identifier}`); - - if (!identifier) { - this.logger.warn('No identifier provided for cleanup'); - return; - } - - try { - const cleanupPath = path.join(this.tempDir, identifier); - this.logger.debug(`Removing directory: ${cleanupPath}`); - - await fs.rm(cleanupPath, { recursive: true, force: true }); - this.logger.log(`Cleanup completed for: ${identifier}`); - } catch (err) { - if (err instanceof Error && err.name !== 'ENOENT') { - this.logger.error(`Cleanup failed: ${err.message}`); - } - } - } -} \ No newline at end of file diff --git a/apps/server/src/upload/clean.service.ts b/apps/server/src/upload/clean.service.ts deleted file mode 100644 index 29df686..0000000 --- a/apps/server/src/upload/clean.service.ts +++ /dev/null @@ -1,52 +0,0 @@ -import { Injectable, Logger } from '@nestjs/common'; -import { Cron, CronExpression } from '@nestjs/schedule'; -import * as fs from 'fs/promises'; -import * as path from 'path'; -import { UploadService } from './upload.service'; - -@Injectable() -export class CleanService { - private readonly logger = new Logger(CleanService.name); - private readonly tempDir: string; - private readonly TEMP_FILE_EXPIRATION_TIME = 24 * 60 * 60 * 1000; // 24 hours - - constructor(private uploadService: UploadService) { - this.tempDir = process.env.UPLOAD_TEMP_DIR || path.join(process.cwd(), 'uploads', 'temp'); - } - - @Cron(CronExpression.EVERY_6_HOURS) - async cleanExpiredTemporaryFiles(): Promise { - try { - const files = await fs.readdir(this.tempDir); - - for (const identifier of files) { - const chunkDir = path.join(this.tempDir, identifier); - - try { - const stat = await fs.stat(chunkDir); - - // 检查目录是否超过过期时间 - const isExpired = (Date.now() - stat.mtime.getTime()) > this.TEMP_FILE_EXPIRATION_TIME; - - // 检查上传是否不在进行中 - const status = this.uploadService.checkUploadStatusInfo(identifier); - const isNotInProgress = !status || - status.status === 'completed' || - status.status === 'error' || - status.status === 'paused'; - - if (isExpired && isNotInProgress) { - await fs.rm(chunkDir, { recursive: true, force: true }); - this.logger.log(`Cleaned up expired temporary files for identifier: ${identifier}`); - this.uploadService.deleteUploadStatusInfo(identifier); - } - } catch (statError) { - // 处理可能的文件系统错误 - this.logger.error(`Error processing directory ${identifier}: ${statError}`); - } - } - } catch (error) { - this.logger.error(`Error during temporary file cleanup: ${error instanceof Error ? error.message : error}`); - } - } -} \ No newline at end of file diff --git a/apps/server/src/upload/tus.service.ts b/apps/server/src/upload/tus.service.ts new file mode 100644 index 0000000..d3e0702 --- /dev/null +++ b/apps/server/src/upload/tus.service.ts @@ -0,0 +1,104 @@ +import { Injectable, OnModuleInit, Logger } from '@nestjs/common'; +import { Server, Upload } from "@nice/tus" +import { FileStore } from '@tus/file-store'; +import { Request, Response } from "express" +import { db, ResourceStatus } from '@nice/common'; +import { getFilenameWithoutExt } from '@server/utils/file'; +import { ResourceService } from '@server/models/resource/resource.service'; +import { Cron, CronExpression } from '@nestjs/schedule'; +import { InjectQueue } from '@nestjs/bullmq'; +import { Queue } from 'bullmq'; +import { QueueJobType } from '@server/queue/types'; + +// Centralized configuration for file storage +const FILE_UPLOAD_CONFIG = { + directory: "./uploads", + maxSizeBytes: 20_000_000_000, // 20GB + expirationPeriod: 24 * 60 * 60 * 1000 // 24 hours +}; + +@Injectable() +export class TusService implements OnModuleInit { + private readonly logger = new Logger(TusService.name); + private tusServer: Server; + + constructor(private readonly resourceService: ResourceService, + @InjectQueue("file-queue") private fileQueue: Queue + ) { } + + onModuleInit() { + this.initializeTusServer(); + this.setupTusEventHandlers(); + } + + private initializeTusServer() { + this.tusServer = new Server({ + path: '/upload', + datastore: new FileStore({ + directory: FILE_UPLOAD_CONFIG.directory, + expirationPeriodInMilliseconds: FILE_UPLOAD_CONFIG.expirationPeriod + }), + maxSize: FILE_UPLOAD_CONFIG.maxSizeBytes, + postReceiveInterval: 1000, + getFileIdFromRequest: (_, lastPath) => lastPath + }); + } + + private setupTusEventHandlers() { + this.tusServer.on("POST_CREATE", this.handleUploadCreate.bind(this)); + this.tusServer.on("POST_FINISH", this.handleUploadFinish.bind(this)); + } + + private async handleUploadCreate(req: Request, res: Response, upload: Upload, url: string) { + try { + await this.resourceService.create({ + data: { + title: getFilenameWithoutExt(upload.metadata.filename), + filename: upload.metadata.filename, + fileId: upload.id, + url, + metadata: upload.metadata, + status: ResourceStatus.UPLOADING + } + }); + } catch (error) { + this.logger.error('Failed to create resource during upload', error); + } + } + + private async handleUploadFinish(req: Request, res: Response, upload: Upload) { + try { + const resource = await this.resourceService.update({ + where: { fileId: upload.id }, + data: { status: ResourceStatus.UPLOADED } + }); + this.fileQueue.add(QueueJobType.FILE_PROCESS, { resource }, { jobId: resource.id }) + this.logger.log('Upload finished', { resourceId: resource.id }); + } catch (error) { + this.logger.error('Failed to update resource after upload', error); + } + } + + @Cron(CronExpression.EVERY_DAY_AT_MIDNIGHT) + async cleanupExpiredUploads() { + try { + // Delete incomplete uploads older than 24 hours + const deletedResources = await db.resource.deleteMany({ + where: { + createdAt: { lt: new Date(Date.now() - FILE_UPLOAD_CONFIG.expirationPeriod) }, + status: ResourceStatus.UPLOADING + } + }); + + const expiredUploadCount = await this.tusServer.cleanUpExpiredUploads(); + + this.logger.log(`Cleanup complete: ${deletedResources.count} resources and ${expiredUploadCount} uploads removed`); + } catch (error) { + this.logger.error('Expired uploads cleanup failed', error); + } + } + + async handleTus(req: Request, res: Response) { + return this.tusServer.handle(req, res); + } +} \ No newline at end of file diff --git a/apps/server/src/upload/upload-lock.service.ts b/apps/server/src/upload/upload-lock.service.ts deleted file mode 100644 index 4d5d136..0000000 --- a/apps/server/src/upload/upload-lock.service.ts +++ /dev/null @@ -1,88 +0,0 @@ -import { Injectable, Logger } from '@nestjs/common'; -import { Cron, CronExpression } from '@nestjs/schedule'; // 注意这里是 @Cron -import { UploadLock } from './types'; -import { UploadLockInfo } from '@nice/common'; - -@Injectable() -export class UploadLockService { - private readonly logger = new Logger(UploadLockService.name); - private readonly uploadLocks: Map = new Map(); - private readonly MAX_UPLOAD_DURATION = 2 * 60 * 60 * 1000; // 2 hours - - acquireLock(identifier: string, clientId: string): boolean { - if (!identifier || !clientId) { - this.logger.warn('Invalid lock parameters'); - return false; - } - const now = Date.now(); - const existingLock = this.uploadLocks.get(identifier); - // Check if lock exists and is not expired - if (existingLock) { - const lockDuration = now - existingLock.timestamp; - - // More robust lock conflict check - if ( - existingLock.clientId !== clientId && - lockDuration < this.MAX_UPLOAD_DURATION - ) { - this.logger.warn(`Upload conflict: File ${identifier} is locked by another client`); - return false; - } - } - - // Acquire or update the lock - this.uploadLocks.set(identifier, { clientId, timestamp: now }); - return true; - } - - releaseLock(identifier: string, clientId: string): boolean { - if (!identifier || !clientId) { - this.logger.warn('Invalid unlock parameters'); - return false; - } - - const currentLock = this.uploadLocks.get(identifier); - - // Only allow release by the current lock owner - if (currentLock?.clientId === clientId) { - this.uploadLocks.delete(identifier); - return true; - } - - this.logger.warn(`Unauthorized lock release attempt for ${identifier}`); - return false; - } - - checkLock(identifier: string): UploadLockInfo { - const now = Date.now(); - const lockInfo = this.uploadLocks.get(identifier); - // Check if lock exists and is not expired - if (lockInfo && now - lockInfo.timestamp < this.MAX_UPLOAD_DURATION) { - return { - isLocked: true, - lockedBy: lockInfo.clientId - }; - } - - return { isLocked: false }; - } - - @Cron(CronExpression.EVERY_HOUR) - cleanupExpiredLocks(): number { - const now = Date.now(); - let removedLocksCount = 0; - - for (const [identifier, lock] of this.uploadLocks.entries()) { - if (now - lock.timestamp > this.MAX_UPLOAD_DURATION) { - this.uploadLocks.delete(identifier); - removedLocksCount++; - } - } - - if (removedLocksCount > 0) { - this.logger.log(`Cleaned up ${removedLocksCount} expired locks`); - } - - return removedLocksCount; - } -} \ No newline at end of file diff --git a/apps/server/src/upload/upload.controller.ts b/apps/server/src/upload/upload.controller.ts index 0fa3656..84e993c 100644 --- a/apps/server/src/upload/upload.controller.ts +++ b/apps/server/src/upload/upload.controller.ts @@ -1,54 +1,111 @@ import { Controller, - Post, - UseInterceptors, - UploadedFile, - Body, - Param, + All, + Req, + Res, Get, + Post, + Patch, + Param, + Delete, } from '@nestjs/common'; -import { FileInterceptor } from '@nestjs/platform-express'; -import { UploadService } from './upload.service'; -import { ChunkDto } from '@nice/common'; +import { Request, Response } from "express" +import { TusService } from './tus.service'; @Controller('upload') export class UploadController { - constructor(private readonly uploadService: UploadService) { } - - @Post('chunk') - @UseInterceptors(FileInterceptor('file')) - async uploadChunk( - @Body('chunk') chunkString: string, // 改为接收字符串 - @UploadedFile() file: Express.Multer.File, - @Body('clientId') clientId: string - ) { - const chunk = JSON.parse(chunkString); // 解析字符串为对象 - await this.uploadService.uploadChunk(chunk, file, clientId); - return { message: 'Chunk uploaded successfully' }; + constructor(private readonly tusService: TusService) { } + @Post() + async handlePost(@Req() req: Request, @Res() res: Response) { + return this.tusService.handleTus(req, res); } - @Get('status/:identifier') - checkUploadStatusInfo(@Param('identifier') identifier: string) { - const status = this.uploadService.checkUploadStatusInfo(identifier); - return status || { message: 'No upload status found' }; - } - @Post('pause/:identifier') - pauseUpload( - @Param('identifier') identifier: string, - @Body('clientId') clientId: string + @Patch(':fileId') // 添加文件ID参数 + async handlePatch( + @Req() req: Request, + @Res() res: Response, + @Param('fileId') fileId: string // 添加文件ID参数 ) { - this.uploadService.pauseUpload(identifier, clientId); - return { message: 'Upload paused successfully' }; - } - - @Post('resume/:identifier') - async resumeUpload( - @Param('identifier') identifier: string, - @Body('clientId') clientId: string - ) { - const resumed = this.uploadService.resumeUpload(identifier, clientId); - if (!resumed) { - throw new Error('Unable to resume upload'); + try { + // 添加错误处理和日志 + const result = await this.tusService.handleTus(req, res); + return result; + } catch (error: any) { + console.error('Upload PATCH error:', error); + res.status(500).json({ + message: 'Upload failed', + error: error.message + }); } - return { message: 'Upload resumed successfully' }; } + @Delete(':fileId') + async handleDelete( + @Req() req: Request, + @Res() res: Response, + @Param('fileId') fileId: string + ) { + try { + const result = await this.tusService.handleTus(req, res); + return result; + } catch (error: any) { + console.error('Upload DELETE error:', error); + res.status(500).json({ + message: 'Delete failed', + error: error.message + }); + } + } + + @Get(':fileId') + async handleGet( + @Req() req: Request, + @Res() res: Response, + @Param('fileId') fileId: string + ) { + try { + const result = await this.tusService.handleTus(req, res); + return result; + } catch (error: any) { + console.error('Upload GET error:', error); + res.status(500).json({ + message: 'Retrieve failed', + error: error.message + }); + } + } + // @Post('chunk') + // @UseInterceptors(FileInterceptor('file')) + // async uploadChunk( + // @Body('chunk') chunkString: string, // 改为接收字符串 + // @UploadedFile() file: Express.Multer.File, + // @Body('clientId') clientId: string + // ) { + // const chunk = JSON.parse(chunkString); // 解析字符串为对象 + // await this.uploadService.uploadChunk(chunk, file, clientId); + // return { message: 'Chunk uploaded successfully' }; + // } + // @Get('status/:identifier') + // checkUploadStatusInfo(@Param('identifier') identifier: string) { + // const status = this.uploadService.checkUploadStatusInfo(identifier); + // return status || { message: 'No upload status found' }; + // } + // @Post('pause/:identifier') + // pauseUpload( + // @Param('identifier') identifier: string, + // @Body('clientId') clientId: string + // ) { + // this.uploadService.pauseUpload(identifier, clientId); + // return { message: 'Upload paused successfully' }; + // } + + // @Post('resume/:identifier') + // async resumeUpload( + // @Param('identifier') identifier: string, + // @Body('clientId') clientId: string + // ) { + // const resumed = this.uploadService.resumeUpload(identifier, clientId); + // if (!resumed) { + // throw new Error('Unable to resume upload'); + // } + // return { message: 'Upload resumed successfully' }; + // } } \ No newline at end of file diff --git a/apps/server/src/upload/upload.module.ts b/apps/server/src/upload/upload.module.ts index 863925c..0a54ccb 100644 --- a/apps/server/src/upload/upload.module.ts +++ b/apps/server/src/upload/upload.module.ts @@ -1,17 +1,17 @@ import { Module } from '@nestjs/common'; import { UploadController } from './upload.controller'; -import { UploadService } from './upload.service'; import { BullModule } from '@nestjs/bullmq'; -import { UploadLockService } from './upload-lock.service'; -import { ChunkManager } from './chunk.manager'; +import { TusService } from './tus.service'; +import { ResourceModule } from '@server/models/resource/resource.module'; @Module({ imports: [ BullModule.registerQueue({ name: 'file-queue', // 确保这个名称与 service 中注入的队列名称一致 }), + ResourceModule ], controllers: [UploadController], - providers: [UploadService, UploadLockService, ChunkManager], + providers: [TusService], }) export class UploadModule { } \ No newline at end of file diff --git a/apps/server/src/upload/upload.service.ts b/apps/server/src/upload/upload.service.ts deleted file mode 100644 index 269b7e9..0000000 --- a/apps/server/src/upload/upload.service.ts +++ /dev/null @@ -1,285 +0,0 @@ -import { Injectable, Logger } from '@nestjs/common'; -import { ChunkDto, UploadStatusInfo, UploadProgress, UploadStatusInfoDto } from '@nice/common'; -import * as fs from 'fs/promises'; -import * as path from 'path'; -import mitt from 'mitt'; -import { ChunkManager } from './chunk.manager'; -import { UploadLockService } from './upload-lock.service'; -import { calculateFileHash } from '@server/utils/file'; -import { UploadEvent } from './types'; -@Injectable() -export class UploadService { - private readonly logger = new Logger(UploadService.name); - private readonly uploadDir: string; - private readonly tempDir: string; - private readonly fileStatuses: Map = new Map(); - private readonly emitter = mitt(); - // Performance Optimization: Configurable upload parameters - private MAX_CONCURRENT_UPLOADS = 5; // Configurable concurrent uploads - private MAX_CHUNK_SIZE = 10 * 1024 * 1024; // 10MB max chunk size - private UPLOAD_TIMEOUT = 30 * 60 * 1000; // 30 minutes timeout - constructor( - private chunkManager: ChunkManager, - private uploadLockService: UploadLockService - ) { - // Validate upload directory configuration - this.uploadDir = this.validateUploadDirectory(); - this.tempDir = path.join(this.uploadDir, 'temp'); - this.initDirectories().catch(error => { - this.logger.error(`Failed to initialize upload directories: ${error.message}`); - process.exit(1); - }); - this.configureUploadLimits(); - } - private validateUploadDirectory(): string { - const uploadDir = process.env.UPLOAD_DIR; - if (!uploadDir) { - throw new Error('UPLOAD_DIR environment variable is not set'); - } - return uploadDir; - } - - private handleUploadError(identifier: string, error: unknown): void { - const status = this.fileStatuses.get(identifier); - const errorMessage = error instanceof Error ? error.message : String(error); - if (status) { - status.status = 'error'; - status.error = errorMessage; - } - this.logger.error(`Upload error for ${identifier}: ${errorMessage}`); - this.emitter.emit('uploadError', { - identifier, - error: errorMessage, - filename: status?.filename - }); - // Safe cleanup of temporary files - this.chunkManager.cleanup(identifier).catch(cleanupError => - this.logger.error(`Cleanup failed for ${identifier}: ${cleanupError}`) - ); - } - // Improved directory initialization with better error handling - private async initDirectories(): Promise { - try { - await fs.mkdir(this.uploadDir, { recursive: true }); - await fs.mkdir(this.tempDir, { recursive: true }); - } catch (error) { - this.logger.error(`Directory initialization failed: ${error}`); - throw error; - } - } - private configureUploadLimits(): void { - const maxUploads = parseInt(process.env.MAX_CONCURRENT_UPLOADS || '5', 10); - const maxChunkSize = parseInt(process.env.MAX_CHUNK_SIZE || '10485760', 10); - this.MAX_CONCURRENT_UPLOADS = maxUploads; - this.MAX_CHUNK_SIZE = maxChunkSize; - } - // Enhanced input validation - async uploadChunk(chunk: ChunkDto, file: Express.Multer.File, clientId: string): Promise { - // Validate chunk size - if (chunk.currentChunkSize > this.MAX_CHUNK_SIZE) { - throw new Error(`Chunk size exceeds maximum limit of ${this.MAX_CHUNK_SIZE} bytes`); - } - - // Rate limiting and concurrent upload control - await this.controlConcurrentUploads(); - - const { identifier } = chunk; - - const lockAcquired = this.uploadLockService.acquireLock(identifier, clientId); - - if (!lockAcquired) { - throw new Error('Concurrent upload limit reached'); - } - - try { - // Add timeout mechanism - const uploadPromise = this.processChunkUpload(chunk, file); - const timeoutPromise = new Promise((_, reject) => - setTimeout(() => reject(new Error('Upload timeout')), this.UPLOAD_TIMEOUT) - ); - - await Promise.race([uploadPromise, timeoutPromise]); - } catch (error) { - this.handleUploadError(identifier, error); - } finally { - this.uploadLockService.releaseLock(identifier, clientId); - } - } - private async controlConcurrentUploads(): Promise { - const activeUploads = Array.from(this.fileStatuses.values()) - .filter(status => status.status === 'uploading').length; - - if (activeUploads >= this.MAX_CONCURRENT_UPLOADS) { - await new Promise(resolve => setTimeout(resolve, 1000)); // Wait and retry - await this.controlConcurrentUploads(); - } - } - - private async processChunkUpload(chunk: ChunkDto, file: Express.Multer.File): Promise { - const { identifier } = chunk; - if (!this.fileStatuses.has(identifier)) { - await this.initUploadStatusInfo(chunk); - } - const status = this.fileStatuses.get(identifier); - if (!status) { - throw new Error('File status initialization failed'); - } - if (status.chunks.has(chunk.chunkNumber)) { - return; - } - await this.chunkManager.save(chunk, file); - this.updateProgress(chunk); - - if (this.isUploadComplete(identifier)) { - await this.finalizeUpload(chunk); - } - } - - private async initUploadStatusInfo(chunk: ChunkDto): Promise { - const { identifier, filename, totalSize } = chunk; - - // 获取已经上传的chunks - const uploadedChunks = await this.chunkManager.getChunks(identifier); - const uploadedSize = uploadedChunks.length * chunk.currentChunkSize; - - this.emitter.emit('uploadStart', { - identifier, - filename, - totalSize, - resuming: uploadedChunks.length > 0 - }); - - this.fileStatuses.set(identifier, { - identifier, - filename, - totalSize, - uploadedSize, - status: 'uploading', - chunks: new Set(uploadedChunks), // 初始化已上传的chunks - startTime: Date.now(), - lastUpdateTime: Date.now() - }); - } - private updateProgress(chunk: ChunkDto): void { - const status = this.fileStatuses.get(chunk.identifier); - if (!status) return; - // Use more efficient progress calculation - const newUploadedSize = chunk.chunkNumber * chunk.currentChunkSize; - const progressPercentage = Math.min( - Math.round((newUploadedSize / status.totalSize) * 100), - 100 - ); - status.chunks.add(chunk.chunkNumber); - status.uploadedSize = newUploadedSize; - status.lastUpdateTime = Date.now(); - const progress: UploadProgress = { - identifier: chunk.identifier, - percentage: progressPercentage, - uploadedSize: newUploadedSize, - totalSize: status.totalSize, - speed: this.calculateSpeed(status), - remainingTime: this.calculateRemainingTime(status) - }; - status.progress = progress - } - - private calculateRemainingTime(status: UploadStatusInfo): number { - const speed = this.calculateSpeed(status); - if (speed === 0) return 0; - const remainingBytes = status.totalSize - status.uploadedSize; - return Math.ceil(remainingBytes / speed); // Returns seconds remaining - } - private calculateSpeed(status: UploadStatusInfo): number { - const duration = (status.lastUpdateTime - status.startTime) / 1000; // in seconds - return duration > 0 ? Math.round(status.uploadedSize / duration) : 0; // bytes per second - } - private async finalizeUpload(chunk: ChunkDto): Promise { - const { identifier, filename, totalChunks, checksum } = chunk; - const chunkDir = path.join(this.tempDir, identifier); - const finalPath = path.join(this.uploadDir, filename); - try { - await this.chunkManager.merge(chunkDir, finalPath, totalChunks); - // Calculate file hash - const calculatedHash = await calculateFileHash(finalPath); - // Verify file integrity - if (checksum && calculatedHash !== checksum) { - throw new Error('File integrity check failed: Checksums do not match'); - } - await this.chunkManager.cleanup(identifier); - const status = this.fileStatuses.get(identifier); - if (status) { - status.status = 'completed'; - status.hash = calculatedHash; - } - this.emitter.emit('uploadComplete', { - identifier, - filename, - size: status.totalSize, - hash: calculatedHash, - integrityVerified: !checksum || calculatedHash === checksum - }); - } catch (error) { - this.handleUploadError(identifier, error); - } - } - private isUploadComplete(identifier: string): boolean { - const status = this.fileStatuses.get(identifier); - if (!status) return false; - - return status.uploadedSize === status.totalSize; - } - - deleteUploadStatusInfo(identifier: string): boolean { - // Check if the file status exists - const status = this.fileStatuses.get(identifier); - if (!status) { - // If the status doesn't exist, return false - return false; - } - // Check if the upload is still in progress - if (status.status === 'uploading') { - this.logger.warn(`Attempting to delete file status for ongoing upload: ${identifier}`); - return false; - } - // Remove the file status from the map - const deleted = this.fileStatuses.delete(identifier); - if (deleted) { - this.logger.log(`File status deleted for identifier: ${identifier}`); - } - return deleted; - } - checkUploadStatusInfo(identifier: string): UploadStatusInfoDto { - const lockInfo = this.uploadLockService.checkLock(identifier); - const statusInfo = { - ...lockInfo, - ...this.fileStatuses.get(identifier) - }; - return statusInfo || null - } - pauseUpload(identifier: string, clientId: string): void { - const status = this.fileStatuses.get(identifier); - if (status) { - status.status = 'paused'; - this.uploadLockService.releaseLock(identifier, clientId); - } - } - resumeUpload(identifier: string, clientId: string): boolean { - const status = this.fileStatuses.get(identifier); - if (status) { - // Try to reacquire the lock - const lockAcquired = this.uploadLockService.acquireLock(identifier, clientId); - if (lockAcquired) { - status.status = 'uploading'; - return true; - } - } - return false; - } - onUploadEvent( - event: K, - handler: (data: UploadEvent[K]) => void - ): () => void { - this.emitter.on(event, handler); - return () => this.emitter.off(event, handler); - } -} \ No newline at end of file diff --git a/apps/web/src/app/main/home/page.tsx b/apps/web/src/app/main/home/page.tsx index 577da43..f7db195 100644 --- a/apps/web/src/app/main/home/page.tsx +++ b/apps/web/src/app/main/home/page.tsx @@ -1,82 +1,81 @@ -import React, { useState } from 'react'; -import { useUpload } from '@nice/client'; // Assuming the previous hook is in this file +import React, { useState, useCallback } from 'react'; +import * as tus from 'tus-js-client'; -const FileUploadComponent: React.FC = () => { - const [selectedFiles, setSelectedFiles] = useState([]); +interface TusUploadProps { + onSuccess?: (response: any) => void; + onError?: (error: Error) => void; +} - const { - upload, - pauseUpload, - resumeUpload, - progress, - errors - } = useUpload({ - // Optional configuration - baseUrl: "http://localhost:3000/upload", - onProgress: (progressInfo) => { - console.log('Upload progress:', progressInfo); - }, - onError: (error) => { - console.error('Upload error:', error); - } - }); +const TusUploader: React.FC = ({ + onSuccess, + onError +}) => { + const [progress, setProgress] = useState(0); + const [isUploading, setIsUploading] = useState(false); + const [uploadError, setUploadError] = useState(null); - const handleFileSelect = (event: React.ChangeEvent) => { - if (event.target.files) { - setSelectedFiles(Array.from(event.target.files)); - } - }; + const handleFileUpload = useCallback((file: File) => { + if (!file) return; - const handleUpload = async () => { - try { - await upload(selectedFiles); - alert('Upload completed successfully!'); - } catch (error) { - console.error('Upload failed:', error); - } - }; + setIsUploading(true); + setProgress(0); + setUploadError(null); - const renderProgressBar = (fileName: string) => { - const fileProgress = progress[fileName]; - if (!fileProgress) return null; + // Extract file extension + const extension = file.name.split('.').pop() || ''; - return ( -
-

{fileName}

- - {fileProgress.percentage.toFixed(2)}% - {errors[fileName] && ( -

- Error: {errors[fileName].message} -

- )} -
- ); - }; + const upload = new tus.Upload(file, { + endpoint: "http://localhost:3000/upload", + retryDelays: [0, 1000, 3000, 5000], + metadata: { + filename: file.name, + // New metadata fields + size: file.size.toString(), + mimeType: file.type, + extension: extension, + modifiedAt: new Date(file.lastModified).toISOString(), + }, + onProgress: (bytesUploaded, bytesTotal) => { + const percentage = ((bytesUploaded / bytesTotal) * 100).toFixed(2); + setProgress(Number(percentage)); + }, + onSuccess: () => { + setIsUploading(false); + setProgress(100); + onSuccess && onSuccess(upload); + }, + onError: (error) => { + setIsUploading(false); + setUploadError(error.message); + onError && onError(error); + } + }); + + upload.start(); + }, [onSuccess, onError]); return (
{ + const file = e.target.files?.[0]; + if (file) handleFileUpload(file); + }} /> - - -
-

Upload Progress

- {selectedFiles.map(file => renderProgressBar(file.name))} -
+ {isUploading && ( +
+ + {progress}% +
+ )} + {uploadError && ( +
+ 上传错误: {uploadError} +
+ )}
); }; -export default FileUploadComponent; \ No newline at end of file +export default TusUploader; \ No newline at end of file diff --git a/packages/client/src/tools/index.ts b/packages/client/src/tools/index.ts index fadfe1d..df5fddc 100644 --- a/packages/client/src/tools/index.ts +++ b/packages/client/src/tools/index.ts @@ -1,4 +1,5 @@ export * from "./level" export * from "./objects" export * from "./number" -export * from "./file" \ No newline at end of file +export * from "./file" + diff --git a/packages/common/prisma/schema.prisma b/packages/common/prisma/schema.prisma index bdde991..81c00e9 100644 --- a/packages/common/prisma/schema.prisma +++ b/packages/common/prisma/schema.prisma @@ -430,33 +430,32 @@ model CourseInstructor { } model Resource { - id String @id @default(cuid()) @map("id") - title String? @map("title") - description String? @map("description") - type String? @map("type") + id String @id @default(cuid()) @map("id") + title String? @map("title") + description String? @map("description") + type String? @map("type") // 存储信息 - filename String? - fileId String? - url String? - hash String? + filename String? + fileId String? @unique + url String? // 元数据 - metadata Json? @map("metadata") + metadata Json? @map("metadata") // 处理状态控制 - processStatus String? - + status String? // 审计字段 - createdAt DateTime? @default(now()) @map("created_at") - updatedAt DateTime? @updatedAt @map("updated_at") - createdBy String? @map("created_by") - updatedBy String? @map("updated_by") - deletedAt DateTime? @map("deleted_at") - isPublic Boolean? @default(true) @map("is_public") - owner Staff? @relation(fields: [ownerId], references: [id]) - ownerId String? @map("owner_id") - post Post? @relation(fields: [postId], references: [id]) - postId String? @map("post_id") - lecture Lecture? @relation(fields: [lectureId], references: [id]) - lectureId String? @map("lecture_id") + createdAt DateTime? @default(now()) @map("created_at") + updatedAt DateTime? @updatedAt @map("updated_at") + createdBy String? @map("created_by") + updatedBy String? @map("updated_by") + deletedAt DateTime? @map("deleted_at") + isPublic Boolean? @default(true) @map("is_public") + + owner Staff? @relation(fields: [ownerId], references: [id]) + ownerId String? @map("owner_id") + post Post? @relation(fields: [postId], references: [id]) + postId String? @map("post_id") + lecture Lecture? @relation(fields: [lectureId], references: [id]) + lectureId String? @map("lecture_id") // 索引 @@index([type]) @@ -464,7 +463,6 @@ model Resource { @@map("resource") } - model Node { id String @id @default(cuid()) @map("id") title String @map("title") diff --git a/packages/common/src/enum.ts b/packages/common/src/enum.ts index 1581857..92084ab 100755 --- a/packages/common/src/enum.ts +++ b/packages/common/src/enum.ts @@ -25,11 +25,13 @@ export enum StorageProvider { CDN = 'CDN' } -export enum ResourceProcessStatus { - PENDING = 'PENDING', +export enum ResourceStatus { + UPLOADING = "UPLOADING", + UPLOADED = "UPLOADED", + PROCESS_PENDING = 'PROCESS_PENDING', PROCESSING = 'PROCESSING', - SUCCESS = 'SUCCESS', - FAILED = 'FAILED', + PROCESSED = 'PROCESSED', + PROCESS_FAILED = 'PROCESS_FAILED' } export enum ObjectType { DEPARTMENT = "department", diff --git a/packages/tus/src/handlers/BaseHandler.ts b/packages/tus/src/handlers/BaseHandler.ts index 0f55a14..2315295 100644 --- a/packages/tus/src/handlers/BaseHandler.ts +++ b/packages/tus/src/handlers/BaseHandler.ts @@ -91,11 +91,9 @@ export class BaseHandler extends EventEmitter { */ generateUrl(req: http.IncomingMessage, id: string) { const path = this.options.path === '/' ? '' : this.options.path - if (this.options.generateUrl) { // 使用用户定义的 generateUrl 函数生成 URL const { proto, host } = this.extractHostAndProto(req) - return this.options.generateUrl(req, { proto, host, @@ -247,7 +245,6 @@ export class BaseHandler extends EventEmitter { const proxy = new PassThrough() // 将取消信号与代理流关联,以便在取消时自动中止流。 addAbortSignal(context.signal, proxy) - // 监听代理流的错误事件,处理流中的错误。 proxy.on('error', (err) => { // 取消请求流与代理流的管道连接。 @@ -255,7 +252,6 @@ export class BaseHandler extends EventEmitter { // 如果错误是 AbortError,则返回 ABORTED 错误,否则返回原始错误。 reject(err.name === 'AbortError' ? ERRORS.ABORTED : err) }) - // 使用 throttle 函数创建一个节流函数,用于定期触发 POST_RECEIVE_V2 事件。 // 该事件用于通知上传进度,避免频繁触发事件导致性能问题。 const postReceive = throttle( @@ -267,7 +263,6 @@ export class BaseHandler extends EventEmitter { this.options.postReceiveInterval, { leading: false } ) - // 临时变量,用于跟踪当前写入的偏移量。 let tempOffset = upload.offset // 监听代理流的 data 事件,每当有数据块通过时更新偏移量并触发进度事件。 @@ -275,7 +270,6 @@ export class BaseHandler extends EventEmitter { tempOffset += chunk.byteLength postReceive(tempOffset) }) - // 监听请求流的 error 事件,处理请求流中的错误。 req.on('error', () => { // 如果代理流未关闭,则优雅地结束流,以便将剩余的字节作为 incompletePart 上传到存储。 @@ -283,7 +277,6 @@ export class BaseHandler extends EventEmitter { proxy.end() } }) - // 使用 stream.pipeline 将请求流通过代理流和 StreamLimiter 传输到存储系统。 // StreamLimiter 用于限制写入的数据量,确保不超过最大文件大小。 stream diff --git a/packages/tus/src/handlers/GetHandler.ts b/packages/tus/src/handlers/GetHandler.ts index 2fbb5d0..4dc3e8a 100644 --- a/packages/tus/src/handlers/GetHandler.ts +++ b/packages/tus/src/handlers/GetHandler.ts @@ -19,7 +19,7 @@ import { ERRORS, Upload } from '../utils' */ export class GetHandler extends BaseHandler { // 使用Map存储路径与处理函数的映射关系,提供O(1)的查找时间复杂度 - paths: Map = new Map() + paths: Map = new Map() /** * 正则表达式用于验证MIME类型是否符合RFC1341规范 * 支持带参数的MIME类型,如:text/plain; charset=utf-8 diff --git a/packages/tus/src/handlers/PatchHandler.ts b/packages/tus/src/handlers/PatchHandler.ts index b80cc0f..b6b4aed 100644 --- a/packages/tus/src/handlers/PatchHandler.ts +++ b/packages/tus/src/handlers/PatchHandler.ts @@ -172,25 +172,47 @@ export class PatchHandler extends BaseHandler { } // 处理上传完成事件 + // 文件上传完成后的处理逻辑块 if (newOffset === upload.size && this.options.onUploadFinish) { try { + // 调用上传完成回调函数,支持异步处理 + // 允许用户自定义上传完成后的处理逻辑 const resOrObject = await this.options.onUploadFinish(req, res, upload) - // 处理旧版兼容性 + + // 兼容性处理:支持两种返回类型 + // 1. 直接返回 http.ServerResponse 对象 + // 2. 返回包含自定义响应信息的对象 if ( + // 检查是否为标准 ServerResponse 对象 typeof (resOrObject as http.ServerResponse).write === 'function' && typeof (resOrObject as http.ServerResponse).writeHead === 'function' ) { + // 直接使用返回的服务器响应对象 res = resOrObject as http.ServerResponse } else { + // 处理自定义响应对象的类型定义 + // 排除 ServerResponse 类型,确保类型安全 type ExcludeServerResponse = T extends http.ServerResponse ? never : T + + // 将返回对象转换为自定义响应对象 const obj = resOrObject as ExcludeServerResponse + + // 更新响应对象 res = obj.res + + // 可选地更新响应状态码 if (obj.status_code) responseData.status = obj.status_code + + // 可选地更新响应体 if (obj.body) responseData.body = obj.body + + // 合并响应头,允许覆盖默认头 if (obj.headers) responseData.headers = Object.assign(obj.headers, responseData.headers) } } catch (error: any) { + // 错误处理:记录上传完成回调中的错误 + // 使用日志记录错误信息,并重新抛出异常 log(`onUploadFinish: ${error.body}`) throw error } diff --git a/packages/tus/src/handlers/PostHandler.ts b/packages/tus/src/handlers/PostHandler.ts index e31d333..fb733db 100644 --- a/packages/tus/src/handlers/PostHandler.ts +++ b/packages/tus/src/handlers/PostHandler.ts @@ -70,7 +70,7 @@ export class PostHandler extends BaseHandler { if ((upload_length === undefined) === (upload_defer_length === undefined)) { throw ERRORS.INVALID_LENGTH } - + let metadata: ReturnType<(typeof Metadata)['parse']> | undefined if ('upload-metadata' in req.headers) { try { @@ -169,33 +169,52 @@ export class PostHandler extends BaseHandler { } finally { await lock.unlock() } - + // 上传完成后的处理逻辑 if (isFinal && this.options.onUploadFinish) { try { + // 调用自定义的上传完成回调函数,传入请求、响应和上传对象 + // 允许用户自定义上传完成后的处理逻辑 const resOrObject = await this.options.onUploadFinish(req, res, upload) - // 向后兼容,将在下一个主要版本中移除 - // 由于在测试中模拟了实例,因此无法使用 `instanceof` 进行检查 + + // 兼容性处理:检查返回值是否为 HTTP 响应对象 + // 通过检查对象是否具有 write 和 writeHead 方法来判断 if ( typeof (resOrObject as http.ServerResponse).write === 'function' && typeof (resOrObject as http.ServerResponse).writeHead === 'function' ) { + // 如果直接返回 HTTP 响应对象,直接覆盖原响应对象 res = resOrObject as http.ServerResponse } else { - // 由于 TS 只理解 instanceof,因此类型定义较为丑陋 + // 处理自定义返回对象的情况 + // 使用复杂的类型定义排除 ServerResponse 类型 type ExcludeServerResponse = T extends http.ServerResponse ? never : T + + // 将返回对象转换为非 ServerResponse 类型 const obj = resOrObject as ExcludeServerResponse + + // 更新响应对象 res = obj.res + + // 根据返回对象更新响应状态码 if (obj.status_code) responseData.status = obj.status_code + + // 更新响应体 if (obj.body) responseData.body = obj.body + + // 合并响应头,允许覆盖默认头 if (obj.headers) responseData.headers = Object.assign(obj.headers, responseData.headers) } } catch (error: any) { + // 记录上传完成回调中的错误 log(`onUploadFinish: ${error.body}`) + + // 抛出错误,中断上传流程 throw error } } + // Upload-Expires 响应头指示未完成的上传何时过期。 // 如果在创建时已知过期时间,则必须在响应中包含 Upload-Expires 头 if ( diff --git a/packages/tus/src/index.ts b/packages/tus/src/index.ts index e44ca96..e9ec3b0 100644 --- a/packages/tus/src/index.ts +++ b/packages/tus/src/index.ts @@ -1,3 +1,4 @@ export { Server } from './server' export * from './types' export * from './lockers' +export * from './utils' \ No newline at end of file diff --git a/packages/tus/src/server.ts b/packages/tus/src/server.ts index e6877a6..c0009cb 100644 --- a/packages/tus/src/server.ts +++ b/packages/tus/src/server.ts @@ -226,13 +226,11 @@ export class Server extends EventEmitter { // biome-ignore lint/suspicious/noConfusingVoidType: it's fine ): Promise { const context = this.createContext(req) - log(`[TusServer] handle: ${req.method} ${req.url}`) // 允许覆盖 HTTP 方法。这样做的原因是某些库/环境不支持 PATCH 和 DELETE 请求,例如浏览器中的 Flash 和 Java 部分环境 if (req.headers['x-http-method-override']) { req.method = (req.headers['x-http-method-override'] as string).toUpperCase() } - const onError = async (error: { status_code?: number body?: string @@ -240,7 +238,6 @@ export class Server extends EventEmitter { }) => { let status_code = error.status_code || ERRORS.UNKNOWN_ERROR.status_code let body = error.body || `${ERRORS.UNKNOWN_ERROR.body}${error.message || ''}\n` - if (this.options.onResponseError) { const errorMapping = await this.options.onResponseError(req, res, error as Error) if (errorMapping) { @@ -248,36 +245,29 @@ export class Server extends EventEmitter { body = errorMapping.body } } - return this.write(context, req, res, status_code, body) } - if (req.method === 'GET') { const handler = this.handlers.GET return handler.send(req, res).catch(onError) } - // Tus-Resumable 头部必须包含在每个请求和响应中,除了 OPTIONS 请求。其值必须是客户端或服务器使用的协议版本。 res.setHeader('Tus-Resumable', TUS_RESUMABLE) - if (req.method !== 'OPTIONS' && req.headers['tus-resumable'] === undefined) { return this.write(context, req, res, 412, 'Tus-Resumable Required\n') } - // 验证所有必需的头部以符合 tus 协议 const invalid_headers = [] for (const header_name in req.headers) { if (req.method === 'OPTIONS') { continue } - // 内容类型仅对 PATCH 请求进行检查。对于所有其他请求方法,它将被忽略并视为未设置内容类型, // 因为某些 HTTP 客户端可能会为此头部强制执行默认值。 // 参见 https://github.com/tus/tus-node-server/pull/116 if (header_name.toLowerCase() === 'content-type' && req.method !== 'PATCH') { continue } - if (!validateHeader(header_name, req.headers[header_name] as string | undefined)) { log(`Invalid ${header_name} header: ${req.headers[header_name]}`) invalid_headers.push(header_name) @@ -287,15 +277,12 @@ export class Server extends EventEmitter { if (invalid_headers.length > 0) { return this.write(context, req, res, 400, `Invalid ${invalid_headers.join(' ')}\n`) } - // 启用 CORS res.setHeader('Access-Control-Allow-Origin', this.getCorsOrigin(req)) res.setHeader('Access-Control-Expose-Headers', EXPOSED_HEADERS) - if (this.options.allowedCredentials === true) { res.setHeader('Access-Control-Allow-Credentials', 'true') } - // 调用请求方法的处理器 const handler = this.handlers[req.method as keyof Handlers] if (handler) { @@ -306,24 +293,35 @@ export class Server extends EventEmitter { } /** - * 获取 CORS 允许的源 - * @param req - HTTP 请求对象 - * @returns 返回允许的源 + * 获取CORS(跨域资源共享)允许的源地址 + * + * 该方法用于确定并返回允许的CORS源地址。首先检查请求头中的`origin`是否在允许的源列表中, + * 如果在则返回该`origin`;如果不在但允许的源列表不为空,则返回列表中的第一个源地址; + * 如果允许的源列表为空,则返回通配符`*`,表示允许所有源地址。 + * + * @param req HTTP请求对象,包含请求头等信息 + * @returns 返回允许的CORS源地址,可能是请求头中的`origin`、允许的源列表中的第一个源地址或通配符`*` + * + * 设计考量: + * - 该方法考虑了CORS策略的灵活性,允许通过配置动态指定允许的源地址。 + * - 通过返回通配符`*`,简化了默认情况下的CORS配置,但需要注意这可能带来安全风险。 */ private getCorsOrigin(req: http.IncomingMessage): string { const origin = req.headers.origin + // 检查请求头中的`origin`是否在允许的源列表中 const isOriginAllowed = this.options.allowedOrigins?.some((allowedOrigin) => allowedOrigin === origin) ?? true - + // 如果`origin`存在且在允许的源列表中,则返回该`origin` if (origin && isOriginAllowed) { return origin } - + // 如果允许的源列表不为空,则返回列表中的第一个源地址 if (this.options.allowedOrigins && this.options.allowedOrigins.length > 0) { return this.options.allowedOrigins[0] } + // 如果允许的源列表为空,则返回通配符`*`,表示允许所有源地址 return '*' } @@ -411,6 +409,7 @@ export class Server extends EventEmitter { const requestAbortController = new AbortController() const abortWithDelayController = new AbortController() + // 当 `abortWithDelayController` 被触发时调用此函数,以在指定延迟后中止请求。 const onDelayedAbort = (err: unknown) => { abortWithDelayController.signal.removeEventListener('abort', onDelayedAbort) setTimeout(() => { @@ -419,10 +418,15 @@ export class Server extends EventEmitter { } abortWithDelayController.signal.addEventListener('abort', onDelayedAbort) + // 当请求关闭时,移除监听器以避免内存泄漏。 req.on('close', () => { abortWithDelayController.signal.removeEventListener('abort', onDelayedAbort) }) + // 返回一个对象,包含信号和两个中止请求的方法。 + // `signal` 用于监听请求中止事件。 + // `abort` 方法用于立即中止请求。 + // `cancel` 方法用于启动延迟中止序列。 return { signal: requestAbortController.signal, abort: () => { diff --git a/packages/tus/src/utils/constants.ts b/packages/tus/src/utils/constants.ts index b1b34b4..4eed674 100644 --- a/packages/tus/src/utils/constants.ts +++ b/packages/tus/src/utils/constants.ts @@ -27,6 +27,7 @@ export const HEADERS = [ 'Forwarded', ] as const + // 将头部信息转换为小写形式,便于处理 export const HEADERS_LOWERCASE = HEADERS.map((header) => { return header.toLowerCase() diff --git a/packages/tus/src/utils/models/Context.ts b/packages/tus/src/utils/models/Context.ts index 89f8f7d..8486fcf 100644 --- a/packages/tus/src/utils/models/Context.ts +++ b/packages/tus/src/utils/models/Context.ts @@ -8,54 +8,7 @@ * - 需要优雅关闭资源连接的场景 */ export interface CancellationContext { - /** - * AbortSignal实例,用于监听取消请求事件。 - * 外部代码可以通过监听该信号来响应取消操作,实现资源的及时释放。 - * - * 设计模式: - * - 采用观察者模式,允许外部订阅取消事件 - * - 与浏览器原生的AbortController API兼容 - */ signal: AbortSignal - - /** - * 立即中止请求的方法。 - * 该方法用于紧急情况下立即终止所有相关操作,适用于: - * - 上传/下载过程中出现不可恢复的错误 - * - 检测到无效请求条件 - * - 需要立即释放系统资源的场景 - * - * 实现要求: - * - 应立即停止所有相关操作 - * - 应释放所有已分配的资源 - * - 应触发signal的abort事件 - * - * 示例: - * const context = createCancellationContext(); - * context.abort(); // 立即终止所有操作 - */ abort: () => void - - /** - * 优雅取消请求的方法。 - * 该方法允许请求在有限时间内完成收尾工作,适用于: - * - 需要完成当前事务的场景 - * - 需要有序关闭资源的场景 - * - 需要确保数据一致性的场景 - * - * 实现要求: - * - 应允许请求在合理时间内完成收尾工作 - * - 应在完成收尾后释放资源 - * - 应触发signal的abort事件 - * - * 示例: - * const context = createCancellationContext(); - * context.cancel(); // 允许请求优雅结束 - * setTimeout(() => { - * if (!context.signal.aborted) { - * context.abort(); // 超时后强制中止 - * } - * }, 5000); - */ cancel: () => void } diff --git a/packages/tus/src/utils/models/DataStore.ts b/packages/tus/src/utils/models/DataStore.ts index d180820..6d36759 100644 --- a/packages/tus/src/utils/models/DataStore.ts +++ b/packages/tus/src/utils/models/DataStore.ts @@ -45,7 +45,7 @@ export class DataStore extends EventEmitter { /** * Called in HEAD requests. This method should return the bytes - * writen to the DataStore, for the client to know where to resume + * written to the DataStore, for the client to know where to resume * the upload. */ async getUpload(id: string): Promise { diff --git a/packages/tus/src/utils/models/Locker.ts b/packages/tus/src/utils/models/Locker.ts index 61ce238..8e8629c 100644 --- a/packages/tus/src/utils/models/Locker.ts +++ b/packages/tus/src/utils/models/Locker.ts @@ -1,27 +1,11 @@ export type RequestRelease = () => Promise | void -/** - * Locker 接口用于为给定的资源标识符创建一个 Lock 实例。 - * 该接口的主要作用是提供一个锁定机制,确保对资源(如上传和其元数据)的独占访问。 - */ + export interface Locker { newLock(id: string): Lock } -/** - * Lock 接口定义了实现锁定机制的方法。 - * 该接口主要用于确保对资源的独占访问,遵循 TUS 协议的建议,强调需要防止长时间保留锁。 - * 这种方法有助于高效管理资源,并避免半开 TCP 连接的问题。 - * - * 方法: - * - lock(id, cancelReq): 获取由 'id' 标识的资源的锁。如果锁已被另一个请求持有, - * 则提供 'cancelReq' 回调以通知当前锁持有者释放锁。 - * 'cancelReq' 回调应在另一个请求尝试获取先前锁定的资源时调用。 - * 这种机制确保锁仅在必要时持有,并迅速释放以供其他请求使用。 - * - * - unlock(id): 释放由 'id' 标识的资源上的锁。锁持有者应在完成操作后或通过 'cancelReq' 回调接收到信号后调用此方法, - * 以释放锁。 - */ + export interface Lock { lock(cancelReq: RequestRelease): Promise unlock(): Promise diff --git a/packages/tus/src/utils/models/StreamSplitter.ts b/packages/tus/src/utils/models/StreamSplitter.ts index 7a47c1d..4dd94da 100644 --- a/packages/tus/src/utils/models/StreamSplitter.ts +++ b/packages/tus/src/utils/models/StreamSplitter.ts @@ -125,10 +125,7 @@ export class StreamSplitter extends stream.Writable { async _handleError() { await this.emitEvent('chunkError', this.currentChunkPath) // 如果发生错误,停止写入操作,防止数据丢失 - if (this.fileHandle === null) { - return - } - + if (this.fileHandle === null) { return } await this.fileHandle.close() this.currentChunkPath = null this.fileHandle = null @@ -139,7 +136,7 @@ export class StreamSplitter extends stream.Writable { */ async _finishChunk(): Promise { if (this.fileHandle === null) { - return + return } await this.fileHandle.close() diff --git a/packages/tus/src/utils/models/Upload.ts b/packages/tus/src/utils/models/Upload.ts index 275df2f..05809ae 100644 --- a/packages/tus/src/utils/models/Upload.ts +++ b/packages/tus/src/utils/models/Upload.ts @@ -1,25 +1,49 @@ +/** + * 模块: Upload + * 文件功能描述: 该模块定义了上传文件的数据模型,包括文件的基本信息和存储信息。 + * 使用场景: 用于管理文件上传过程中的元数据和状态,适用于需要处理文件上传的Web应用或服务。 + */ +/** + * 类型: TUpload + * 核心功能概述: 定义了上传文件的数据结构,包括文件ID、大小、偏移量、元数据、存储信息等。 + */ type TUpload = { - id: string - size?: number - offset: number - metadata?: Record - storage?: { - type: string - path: string - bucket?: string + id: string // 文件唯一标识符 + size?: number // 文件大小,可选 + offset: number // 文件上传的偏移量 + metadata?: Record // 文件的元数据,可选 + storage?: { // 文件的存储信息,可选 + type: string // 存储类型 + path: string // 存储路径 + bucket?: string // 存储桶,可选 } - creation_date?: string + creation_date?: string // 文件创建日期,可选 } +/** + * 类: Upload + * 核心功能概述: 封装了上传文件的数据模型,提供了文件信息的初始化和访问方法。 + * 设计模式解析: 使用构造函数模式初始化对象,通过getter方法提供属性访问。 + * 使用示例: + * const upload = new Upload({ id: '123', size: 1024, offset: 0 }); + * console.log(upload.sizeIsDeferred); // 检查文件大小是否延迟 + */ export class Upload { - id: TUpload['id'] - metadata: TUpload['metadata'] - size: TUpload['size'] - offset: TUpload['offset'] - creation_date: TUpload['creation_date'] - storage: TUpload['storage'] + id: TUpload['id'] // 文件ID + metadata: TUpload['metadata'] // 文件元数据 + size: TUpload['size'] // 文件大小 + offset: TUpload['offset'] // 文件上传偏移量 + creation_date: TUpload['creation_date'] // 文件创建日期 + storage: TUpload['storage'] // 文件存储信息 + /** + * 构造函数 + * 功能详细描述: 初始化Upload对象,检查必要的ID属性,并设置默认的创建日期。 + * 输入参数解析: + * - upload: TUpload类型,包含文件的基本信息和存储信息。 + * 异常处理机制: 如果未提供ID,则抛出错误。 + */ constructor(upload: TUpload) { // 检查ID是否存在,不存在则抛出错误 if (!upload.id) { @@ -37,7 +61,12 @@ export class Upload { this.creation_date = upload.creation_date ?? new Date().toISOString() } + /** + * 方法: sizeIsDeferred + * 功能详细描述: 检查文件大小是否未定义,即是否延迟上传。 + * 返回值说明: 返回布尔值,true表示文件大小未定义,false表示已定义。 + */ get sizeIsDeferred(): boolean { return this.size === undefined } -} +} \ No newline at end of file diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 15d8f97..d18b639 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -40,6 +40,9 @@ importers: '@nice/common': specifier: workspace:* version: link:../../packages/common + '@nice/tus': + specifier: workspace:* + version: link:../../packages/tus '@trpc/server': specifier: 11.0.0-rc.456 version: 11.0.0-rc.456 @@ -49,9 +52,6 @@ importers: '@tus/s3-store': specifier: ^1.6.2 version: 1.6.2 - '@tus/server': - specifier: ^1.10.0 - version: 1.10.0 argon2: specifier: ^0.41.1 version: 0.41.1 @@ -2609,10 +2609,6 @@ packages: resolution: {integrity: sha512-u8+CxH8Q0E1Bf3PrmaxOWki826wHrOci2GLEyQ5Lj5UI+CzFOWzhNuq213PI9y2RDY53uizt5qMuXcrAO13bYw==} engines: {node: '>=16'} - '@tus/server@1.10.0': - resolution: {integrity: sha512-wiHRlCSq13ApgizYvP17y/utf/ztF6trWmuUnJ92FFTcoztF2MzEnx1gsL2XtSltSbsOfc/S6xT+PqP2x/DGgw==} - engines: {node: '>=16'} - '@tus/utils@0.5.0': resolution: {integrity: sha512-SFJC9db7hJ5O9HbvpN9EKJYdjn8cx8tGgwFDDHPRXh+qBNYdb/MK6kh/vN6Rh82QH07D4k4nwGh4jlpugo6F7g==} engines: {node: '>=16'} @@ -9225,17 +9221,6 @@ snapshots: - aws-crt - supports-color - '@tus/server@1.10.0': - dependencies: - '@tus/utils': 0.5.0 - debug: 4.4.0 - lodash.throttle: 4.1.1 - optionalDependencies: - '@redis/client': 1.6.0 - ioredis: 5.4.1 - transitivePeerDependencies: - - supports-color - '@tus/utils@0.5.0': {} '@types/babel__core@7.20.5':