origin/packages/client/src/upload/uploadManager.ts

225 lines
7.1 KiB
TypeScript
Executable File

import axios, { AxiosInstance } from 'axios';
import { ChunkDto, UploadProgress, UploadStatusInfoDto, UUIDGenerator } from '@nice/common';
import { UploadOptions } from './types';
import { calculateFileIdentifier } from '../tools';
export class UploadManager {
private readonly axios: AxiosInstance;
private readonly chunkSize: number;
private readonly concurrency: number;
private readonly retries: number;
private readonly clientId: string;
private activeUploads: Map<string, boolean> = new Map();
private abortControllers: Map<string, AbortController> = new Map();
constructor(options: UploadOptions = {}) {
const {
baseUrl = '/upload',
chunkSize = 10 * 1024 * 1024,
concurrency = 3,
retries = 3,
} = options;
this.axios = axios.create({ baseURL: baseUrl });
this.chunkSize = chunkSize;
this.concurrency = concurrency;
this.retries = retries;
this.clientId = options.clientId || UUIDGenerator.generate();
}
async uploadFile(file: File, options: UploadOptions = {}): Promise<void> {
const identifier = await calculateFileIdentifier(file);
const controller = new AbortController();
this.abortControllers.set(identifier, controller);
try {
// Check if file is already uploaded
const statusInfo = await this.checkUploadStatusInfo(identifier);
if (statusInfo?.status === "completed") {
options.onSuccess?.({ identifier, filename: file.name });
return;
}
const chunks = await this.prepareChunks(file, identifier);
const uploadedChunks = statusInfo?.chunks || new Set<number>();
// Filter out already uploaded chunks
const remainingChunks = chunks.filter(chunk => !uploadedChunks.has(chunk.chunkNumber));
await this.uploadChunks(remainingChunks, file, options, controller.signal);
} catch (error) {
if (axios.isCancel(error)) {
return;
}
options.onError?.(error as Error);
throw error;
} finally {
this.abortControllers.delete(identifier);
this.activeUploads.delete(identifier);
}
}
private async prepareChunks(file: File, identifier: string): Promise<ChunkDto[]> {
const chunks: ChunkDto[] = [];
const totalChunks = Math.ceil(file.size / this.chunkSize);
for (let i = 0; i < totalChunks; i++) {
chunks.push({
identifier,
filename: file.name,
chunkNumber: i + 1,
totalChunks,
currentChunkSize: Math.min(this.chunkSize, file.size - i * this.chunkSize),
totalSize: file.size,
});
}
return chunks;
}
private async uploadChunks(
chunks: ChunkDto[],
file: File,
options: UploadOptions,
signal: AbortSignal
): Promise<void> {
const chunkQueue = [...chunks]; // Create a copy of chunks array
let activeUploads = 0;
let completedChunks = 0;
let uploadedBytes = 0;
// 记录最近几次chunk的上传速度
const speedBuffer: number[] = [];
const SPEED_BUFFER_SIZE = 5; // 保留最近5个chunk的速度
return new Promise((resolve, reject) => {
const uploadNextChunk = async () => {
if (completedChunks === chunks.length) {
if (activeUploads === 0) { // Only resolve when all active uploads are done
resolve();
}
return;
}
while (activeUploads < this.concurrency && chunkQueue.length > 0) {
const chunk = chunkQueue.shift();
if (!chunk) break;
const chunkStartTime = Date.now();
activeUploads++;
this.uploadChunk(chunk, file, signal)
.then(() => {
const chunkEndTime = Date.now();
const chunkUploadTime = (chunkEndTime - chunkStartTime) / 1000; // 秒
completedChunks++;
uploadedBytes += chunk.currentChunkSize;
activeUploads--;
// 计算当前chunk的上传速度
const currentSpeed = chunkUploadTime > 0
? chunk.currentChunkSize / chunkUploadTime
: 0;
// 维护速度缓冲区
speedBuffer.push(currentSpeed);
if (speedBuffer.length > SPEED_BUFFER_SIZE) {
speedBuffer.shift();
}
// 计算平均速度
const averageSpeed = speedBuffer.length > 0
? speedBuffer.reduce((a, b) => a + b, 0) / speedBuffer.length
: 0;
const totalUploadedBytes = uploadedBytes;
const remainingBytes = file.size - totalUploadedBytes;
// 使用平均速度计算剩余时间
const remainingTime = averageSpeed > 0
? remainingBytes / averageSpeed
: 0;
const progress: UploadProgress = {
identifier: chunk.identifier,
percentage: (completedChunks / (chunks.length + completedChunks)) * 100,
uploadedSize: totalUploadedBytes,
totalSize: file.size,
speed: averageSpeed, // 字节/秒
remainingTime: remainingTime // 秒
};
options.onProgress?.(progress);
uploadNextChunk();
})
.catch(reject);
}
};
uploadNextChunk();
});
}
private async uploadChunk(
chunk: ChunkDto,
file: File,
signal: AbortSignal
): Promise<void> {
const start = (chunk.chunkNumber - 1) * this.chunkSize;
const end = Math.min(start + this.chunkSize, file.size);
const chunkBlob = file.slice(start, end);
const formData = new FormData();
formData.append('chunk', JSON.stringify(chunk));
formData.append('clientId', this.clientId)
formData.append('file', chunkBlob);
let attempts = 0;
while (attempts < this.retries) {
try {
await this.axios.post('/chunk', formData, { signal });
return;
} catch (error) {
attempts++;
if (attempts === this.retries) throw error;
await new Promise(resolve => setTimeout(resolve, 1000 * attempts));
}
}
}
async pauseUpload(identifier: string): Promise<void> {
const controller = this.abortControllers.get(identifier);
if (controller) {
controller.abort();
this.abortControllers.delete(identifier);
}
try {
// Call the pause API endpoint
await this.axios.post(`/pause/${identifier}`);
} catch (error) {
console.error('Error pausing upload:', error);
throw error;
}
}
async resumeUpload(file: File, options: UploadOptions = {}): Promise<void> {
const identifier = await calculateFileIdentifier(file);
try {
// Call the resume API endpoint
await this.axios.post(`/resume/${identifier}`);
// Then continue with the upload process
return this.uploadFile(file, options);
} catch (error) {
console.error('Error resuming upload:', error);
throw error;
}
}
private async checkUploadStatusInfo(identifier: string): Promise<UploadStatusInfoDto> {
try {
const response = await this.axios.get(`/status/${identifier}`);
return response.data;
} catch {
return null;
}
}
}