doctor-mail/apps/server/src/upload/upload.service.ts

285 lines
11 KiB
TypeScript

import { Injectable, Logger } from '@nestjs/common';
import { ChunkDto, UploadStatusInfo, UploadProgress, UploadStatusInfoDto } from '@nice/common';
import * as fs from 'fs/promises';
import * as path from 'path';
import mitt from 'mitt';
import { ChunkManager } from './chunk.manager';
import { UploadLockService } from './upload-lock.service';
import { calculateFileHash } from '@server/utils/file';
import { UploadEvent } from './types';
@Injectable()
export class UploadService {
private readonly logger = new Logger(UploadService.name);
private readonly uploadDir: string;
private readonly tempDir: string;
private readonly fileStatuses: Map<string, UploadStatusInfo> = new Map();
private readonly emitter = mitt<UploadEvent>();
// Performance Optimization: Configurable upload parameters
private MAX_CONCURRENT_UPLOADS = 5; // Configurable concurrent uploads
private MAX_CHUNK_SIZE = 10 * 1024 * 1024; // 10MB max chunk size
private UPLOAD_TIMEOUT = 30 * 60 * 1000; // 30 minutes timeout
constructor(
private chunkManager: ChunkManager,
private uploadLockService: UploadLockService
) {
// Validate upload directory configuration
this.uploadDir = this.validateUploadDirectory();
this.tempDir = path.join(this.uploadDir, 'temp');
this.initDirectories().catch(error => {
this.logger.error(`Failed to initialize upload directories: ${error.message}`);
process.exit(1);
});
this.configureUploadLimits();
}
private validateUploadDirectory(): string {
const uploadDir = process.env.UPLOAD_DIR;
if (!uploadDir) {
throw new Error('UPLOAD_DIR environment variable is not set');
}
return uploadDir;
}
private handleUploadError(identifier: string, error: unknown): void {
const status = this.fileStatuses.get(identifier);
const errorMessage = error instanceof Error ? error.message : String(error);
if (status) {
status.status = 'error';
status.error = errorMessage;
}
this.logger.error(`Upload error for ${identifier}: ${errorMessage}`);
this.emitter.emit('uploadError', {
identifier,
error: errorMessage,
filename: status?.filename
});
// Safe cleanup of temporary files
this.chunkManager.cleanup(identifier).catch(cleanupError =>
this.logger.error(`Cleanup failed for ${identifier}: ${cleanupError}`)
);
}
// Improved directory initialization with better error handling
private async initDirectories(): Promise<void> {
try {
await fs.mkdir(this.uploadDir, { recursive: true });
await fs.mkdir(this.tempDir, { recursive: true });
} catch (error) {
this.logger.error(`Directory initialization failed: ${error}`);
throw error;
}
}
private configureUploadLimits(): void {
const maxUploads = parseInt(process.env.MAX_CONCURRENT_UPLOADS || '5', 10);
const maxChunkSize = parseInt(process.env.MAX_CHUNK_SIZE || '10485760', 10);
this.MAX_CONCURRENT_UPLOADS = maxUploads;
this.MAX_CHUNK_SIZE = maxChunkSize;
}
// Enhanced input validation
async uploadChunk(chunk: ChunkDto, file: Express.Multer.File, clientId: string): Promise<void> {
// Validate chunk size
if (chunk.currentChunkSize > this.MAX_CHUNK_SIZE) {
throw new Error(`Chunk size exceeds maximum limit of ${this.MAX_CHUNK_SIZE} bytes`);
}
// Rate limiting and concurrent upload control
await this.controlConcurrentUploads();
const { identifier } = chunk;
const lockAcquired = this.uploadLockService.acquireLock(identifier, clientId);
if (!lockAcquired) {
throw new Error('Concurrent upload limit reached');
}
try {
// Add timeout mechanism
const uploadPromise = this.processChunkUpload(chunk, file);
const timeoutPromise = new Promise((_, reject) =>
setTimeout(() => reject(new Error('Upload timeout')), this.UPLOAD_TIMEOUT)
);
await Promise.race([uploadPromise, timeoutPromise]);
} catch (error) {
this.handleUploadError(identifier, error);
} finally {
this.uploadLockService.releaseLock(identifier, clientId);
}
}
private async controlConcurrentUploads(): Promise<void> {
const activeUploads = Array.from(this.fileStatuses.values())
.filter(status => status.status === 'uploading').length;
if (activeUploads >= this.MAX_CONCURRENT_UPLOADS) {
await new Promise(resolve => setTimeout(resolve, 1000)); // Wait and retry
await this.controlConcurrentUploads();
}
}
private async processChunkUpload(chunk: ChunkDto, file: Express.Multer.File): Promise<void> {
const { identifier } = chunk;
if (!this.fileStatuses.has(identifier)) {
await this.initUploadStatusInfo(chunk);
}
const status = this.fileStatuses.get(identifier);
if (!status) {
throw new Error('File status initialization failed');
}
if (status.chunks.has(chunk.chunkNumber)) {
return;
}
await this.chunkManager.save(chunk, file);
this.updateProgress(chunk);
if (this.isUploadComplete(identifier)) {
await this.finalizeUpload(chunk);
}
}
private async initUploadStatusInfo(chunk: ChunkDto): Promise<void> {
const { identifier, filename, totalSize } = chunk;
// 获取已经上传的chunks
const uploadedChunks = await this.chunkManager.getChunks(identifier);
const uploadedSize = uploadedChunks.length * chunk.currentChunkSize;
this.emitter.emit('uploadStart', {
identifier,
filename,
totalSize,
resuming: uploadedChunks.length > 0
});
this.fileStatuses.set(identifier, {
identifier,
filename,
totalSize,
uploadedSize,
status: 'uploading',
chunks: new Set(uploadedChunks), // 初始化已上传的chunks
startTime: Date.now(),
lastUpdateTime: Date.now()
});
}
private updateProgress(chunk: ChunkDto): void {
const status = this.fileStatuses.get(chunk.identifier);
if (!status) return;
// Use more efficient progress calculation
const newUploadedSize = chunk.chunkNumber * chunk.currentChunkSize;
const progressPercentage = Math.min(
Math.round((newUploadedSize / status.totalSize) * 100),
100
);
status.chunks.add(chunk.chunkNumber);
status.uploadedSize = newUploadedSize;
status.lastUpdateTime = Date.now();
const progress: UploadProgress = {
identifier: chunk.identifier,
percentage: progressPercentage,
uploadedSize: newUploadedSize,
totalSize: status.totalSize,
speed: this.calculateSpeed(status),
remainingTime: this.calculateRemainingTime(status)
};
status.progress = progress
}
private calculateRemainingTime(status: UploadStatusInfo): number {
const speed = this.calculateSpeed(status);
if (speed === 0) return 0;
const remainingBytes = status.totalSize - status.uploadedSize;
return Math.ceil(remainingBytes / speed); // Returns seconds remaining
}
private calculateSpeed(status: UploadStatusInfo): number {
const duration = (status.lastUpdateTime - status.startTime) / 1000; // in seconds
return duration > 0 ? Math.round(status.uploadedSize / duration) : 0; // bytes per second
}
private async finalizeUpload(chunk: ChunkDto): Promise<void> {
const { identifier, filename, totalChunks, checksum } = chunk;
const chunkDir = path.join(this.tempDir, identifier);
const finalPath = path.join(this.uploadDir, filename);
try {
await this.chunkManager.merge(chunkDir, finalPath, totalChunks);
// Calculate file hash
const calculatedHash = await calculateFileHash(finalPath);
// Verify file integrity
if (checksum && calculatedHash !== checksum) {
throw new Error('File integrity check failed: Checksums do not match');
}
await this.chunkManager.cleanup(identifier);
const status = this.fileStatuses.get(identifier);
if (status) {
status.status = 'completed';
status.hash = calculatedHash;
}
this.emitter.emit('uploadComplete', {
identifier,
filename,
size: status.totalSize,
hash: calculatedHash,
integrityVerified: !checksum || calculatedHash === checksum
});
} catch (error) {
this.handleUploadError(identifier, error);
}
}
private isUploadComplete(identifier: string): boolean {
const status = this.fileStatuses.get(identifier);
if (!status) return false;
return status.uploadedSize === status.totalSize;
}
deleteUploadStatusInfo(identifier: string): boolean {
// Check if the file status exists
const status = this.fileStatuses.get(identifier);
if (!status) {
// If the status doesn't exist, return false
return false;
}
// Check if the upload is still in progress
if (status.status === 'uploading') {
this.logger.warn(`Attempting to delete file status for ongoing upload: ${identifier}`);
return false;
}
// Remove the file status from the map
const deleted = this.fileStatuses.delete(identifier);
if (deleted) {
this.logger.log(`File status deleted for identifier: ${identifier}`);
}
return deleted;
}
checkUploadStatusInfo(identifier: string): UploadStatusInfoDto {
const lockInfo = this.uploadLockService.checkLock(identifier);
const statusInfo = {
...lockInfo,
...this.fileStatuses.get(identifier)
};
return statusInfo || null
}
pauseUpload(identifier: string, clientId: string): void {
const status = this.fileStatuses.get(identifier);
if (status) {
status.status = 'paused';
this.uploadLockService.releaseLock(identifier, clientId);
}
}
resumeUpload(identifier: string, clientId: string): boolean {
const status = this.fileStatuses.get(identifier);
if (status) {
// Try to reacquire the lock
const lockAcquired = this.uploadLockService.acquireLock(identifier, clientId);
if (lockAcquired) {
status.status = 'uploading';
return true;
}
}
return false;
}
onUploadEvent<K extends keyof UploadEvent>(
event: K,
handler: (data: UploadEvent[K]) => void
): () => void {
this.emitter.on(event, handler);
return () => this.emitter.off(event, handler);
}
}