import EventEmitter from 'node:events' import stream from 'node:stream/promises' import { addAbortSignal, PassThrough } from 'node:stream' import type http from 'node:http' import type { ServerOptions } from '../types' import throttle from 'lodash.throttle' import { CancellationContext, DataStore, ERRORS, EVENTS, StreamLimiter, Upload } from '../utils' /** * 正则表达式,用于从请求 URL 中提取文件 ID。 * 该正则表达式匹配 URL 中最后一个斜杠后的所有字符,直到字符串结束。 * - `([^/]+)`:捕获组,匹配一个或多个非斜杠字符。 * - `\/?$`:匹配可选的斜杠,并确保匹配到字符串的末尾。 * 示例: * - 输入 `/files/12345`,匹配结果为 `12345`。 * - 输入 `/files/12345/`,匹配结果为 `12345`。 */ const reExtractFileID = /([^/]+)\/?$/ /** * 正则表达式,用于从 HTTP 请求头中的 `forwarded` 字段提取主机名。 * 该正则表达式匹配 `host=""` 或 `host=` 格式的字符串,并提取 `` 部分。 * - `host="?`:匹配 `host=` 或 `host="`。 * - `([^";]+)`:捕获组,匹配一个或多个非分号和双引号的字符。 * 示例: * - 输入 `host="example.com"`,匹配结果为 `example.com`。 * - 输入 `host=example.com`,匹配结果为 `example.com`。 */ const reForwardedHost = /host="?([^";]+)/ /** * 正则表达式,用于从 HTTP 请求头中的 `forwarded` 字段提取协议(如 `http` 或 `https`)。 * 该正则表达式匹配 `proto=` 格式的字符串,并提取 `` 部分。 * - `proto=`:匹配 `proto=` 字符串。 * - `(https?)`:捕获组,匹配 `http` 或 `https`。 * 示例: * - 输入 `proto=https`,匹配结果为 `https`。 * - 输入 `proto=http`,匹配结果为 `http`。 */ const reForwardedProto = /proto=(https?)/ /** * BaseHandler 类是一个基础处理器,用于处理 TUS 协议的上传请求。 * 它继承自 Node.js 的 EventEmitter,允许发出和监听事件。 */ export class BaseHandler extends EventEmitter { options: ServerOptions store: DataStore /** * 构造函数,初始化 BaseHandler 实例。 * @param store - 数据存储对象,用于处理上传数据的存储。 * @param options - 服务器配置选项。 * @throws 如果未提供 store 参数,则抛出错误。 */ constructor(store: DataStore, options: ServerOptions) { super() if (!store) { throw new Error('Store must be defined') } this.store = store this.options = options } /** * 向客户端发送 HTTP 响应。 * @param res - HTTP 响应对象。 * @param status - HTTP 状态码。 * @param headers - 响应头对象。 * @param body - 响应体内容。 * @returns 返回结束的响应对象。 */ write(res: http.ServerResponse, status: number, headers = {}, body = '') { if (status !== 204) { // @ts-expect-error not explicitly typed but possible headers['Content-Length'] = Buffer.byteLength(body, 'utf8') } res.writeHead(status, headers) res.write(body) return res.end() } /** * 生成上传文件的 URL。 * @param req - HTTP 请求对象。 * @param id - 文件 ID。 * @returns 返回生成的 URL。 */ generateUrl(req: http.IncomingMessage, id: string) { const path = this.options.path === '/' ? '' : this.options.path if (this.options.generateUrl) { // 使用用户定义的 generateUrl 函数生成 URL const { proto, host } = this.extractHostAndProto(req) return this.options.generateUrl(req, { proto, host, path: path, id, }) } // 默认实现 if (this.options.relativeLocation) { return `${path}/${id}` } const { proto, host } = this.extractHostAndProto(req) return `${proto}://${host}${path}/${id}` } /** * 从请求中提取文件 ID。 * @param req - HTTP 请求对象。 * @returns 返回提取的文件 ID,如果未找到则返回 undefined。 */ getFileIdFromRequest(req: http.IncomingMessage) { const match = reExtractFileID.exec(req.url as string) if (this.options.getFileIdFromRequest) { const lastPath = match ? decodeURIComponent(match[1]) : undefined return this.options.getFileIdFromRequest(req, lastPath) } if (!match || this.options.path.includes(match[1])) { return } return decodeURIComponent(match[1]) } /** * 从 HTTP 请求中提取主机名和协议信息。 * 该方法首先检查是否启用了尊重转发头(respectForwardedHeaders)选项, * 如果启用,则从请求头中提取转发的主机名和协议信息。 * 如果未启用或未找到转发信息,则使用请求头中的主机名和默认协议(http)。 * * @param req - HTTP 请求对象,包含请求头等信息。 * @returns 返回包含主机名和协议的对象。 */ protected extractHostAndProto(req: http.IncomingMessage) { let proto: string | undefined let host: string | undefined // 如果启用了尊重转发头选项 if (this.options.respectForwardedHeaders) { // 从请求头中获取 forwarded 字段 const forwarded = req.headers.forwarded as string | undefined if (forwarded) { // 使用正则表达式从 forwarded 字段中提取主机名和协议 host ??= reForwardedHost.exec(forwarded)?.[1] proto ??= reForwardedProto.exec(forwarded)?.[1] } // 从请求头中获取 x-forwarded-host 和 x-forwarded-proto 字段 const forwardHost = req.headers['x-forwarded-host'] const forwardProto = req.headers['x-forwarded-proto'] // 检查 x-forwarded-proto 是否为有效的协议(http 或 https) // @ts-expect-error we can pass undefined if (['http', 'https'].includes(forwardProto)) { proto ??= forwardProto as string } // 如果 x-forwarded-host 存在,则使用它作为主机名 host ??= forwardHost as string } // 如果未从转发头中获取到主机名,则使用请求头中的 host 字段 host ??= req.headers.host // 如果未从转发头中获取到协议,则默认使用 http proto ??= 'http' // 返回包含主机名和协议的对象 return { host: host as string, proto } } /** * 获取锁对象。 * @param req - HTTP 请求对象。 * @returns 返回锁对象。 */ protected async getLocker(req: http.IncomingMessage) { if (typeof this.options.locker === 'function') { return this.options.locker(req) } return this.options.locker } /** * 获取锁并锁定资源。 * @param req - HTTP 请求对象。 * @param id - 文件 ID。 * @param context - 取消上下文对象。 * @returns 返回锁对象。 */ protected async acquireLock( req: http.IncomingMessage, id: string, context: CancellationContext ) { const locker = await this.getLocker(req) const lock = locker.newLock(id) await lock.lock(() => { context.cancel() }) return lock } /** * 将请求体数据写入存储。 * 该方法负责将 HTTP 请求体中的数据流式传输到存储系统中,同时处理取消操作、错误处理和进度更新。 * * @param req - HTTP 请求对象,包含请求体数据流。 * @param upload - 上传对象,包含上传的元数据(如文件 ID、偏移量等)。 * @param maxFileSize - 允许的最大文件大小,用于限制写入的数据量。 * @param context - 取消上下文对象,用于处理取消操作。 * @returns 返回一个 Promise,解析为写入的字节数。 */ protected writeToStore( req: http.IncomingMessage, upload: Upload, maxFileSize: number, context: CancellationContext ) { // 使用 Promise 包装异步操作,以便更好地处理取消和错误。 // biome-ignore lint/suspicious/noAsyncPromiseExecutor: return new Promise(async (resolve, reject) => { // 检查是否已被取消,如果已取消则直接拒绝 Promise。 if (context.signal.aborted) { reject(ERRORS.ABORTED) return } // 创建一个 PassThrough 流作为代理,用于管理请求流。 // PassThrough 流是一个透明的流,它允许数据通过而不进行任何修改。 // 使用代理流的好处是可以在不影响原始请求流的情况下中止写入过程。 const proxy = new PassThrough() // 将取消信号与代理流关联,以便在取消时自动中止流。 addAbortSignal(context.signal, proxy) // 监听代理流的错误事件,处理流中的错误。 proxy.on('error', (err) => { // 取消请求流与代理流的管道连接。 req.unpipe(proxy) // 如果错误是 AbortError,则返回 ABORTED 错误,否则返回原始错误。 reject(err.name === 'AbortError' ? ERRORS.ABORTED : err) }) // 使用 throttle 函数创建一个节流函数,用于定期触发 POST_RECEIVE_V2 事件。 // 该事件用于通知上传进度,避免频繁触发事件导致性能问题。 const postReceive = throttle( (offset: number) => { // 触发 POST_RECEIVE_V2 事件,传递当前上传的偏移量。 this.emit(EVENTS.POST_RECEIVE_V2, req, { ...upload, offset }) }, // 设置节流的时间间隔,避免事件触发过于频繁。 this.options.postReceiveInterval, { leading: false } ) // 临时变量,用于跟踪当前写入的偏移量。 let tempOffset = upload.offset // 监听代理流的 data 事件,每当有数据块通过时更新偏移量并触发进度事件。 proxy.on('data', (chunk: Buffer) => { tempOffset += chunk.byteLength postReceive(tempOffset) }) // 监听请求流的 error 事件,处理请求流中的错误。 req.on('error', () => { // 如果代理流未关闭,则优雅地结束流,以便将剩余的字节作为 incompletePart 上传到存储。 if (!proxy.closed) { proxy.end() } }) // 使用 stream.pipeline 将请求流通过代理流和 StreamLimiter 传输到存储系统。 // StreamLimiter 用于限制写入的数据量,确保不超过最大文件大小。 stream .pipeline( // 将请求流通过代理流传输。 req.pipe(proxy), // 使用 StreamLimiter 限制写入的数据量。 new StreamLimiter(maxFileSize), // 将数据流写入存储系统。 async (stream) => { return this.store.write(stream as StreamLimiter, upload.id, upload.offset) } ) // 如果管道操作成功,则解析 Promise 并返回写入的字节数。 .then(resolve) // 如果管道操作失败,则拒绝 Promise 并返回错误。 .catch(reject) }) } /** * 获取配置的最大文件大小。 * @param req - HTTP 请求对象。 * @param id - 文件 ID。 * @returns 返回配置的最大文件大小。 */ getConfiguredMaxSize(req: http.IncomingMessage, id: string | null) { if (typeof this.options.maxSize === 'function') { return this.options.maxSize(req, id) } return this.options.maxSize ?? 0 } /** * 计算上传请求体的最大允许大小。 * 该函数考虑了服务器配置的最大大小和上传的具体情况,例如大小是延迟的还是固定的。 * @param req - HTTP 请求对象。 * @param file - 上传对象。 * @param configuredMaxSize - 配置的最大大小。 * @returns 返回计算出的最大请求体大小。 * @throws 如果上传大小超过允许的最大大小,则抛出 ERRORS.ERR_SIZE_EXCEEDED 错误。 */ async calculateMaxBodySize( req: http.IncomingMessage, file: Upload, configuredMaxSize?: number ) { // 如果未明确提供,则使用服务器配置的最大大小。 configuredMaxSize ??= await this.getConfiguredMaxSize(req, file.id) // 从请求中解析 Content-Length 头(如果未设置,则默认为 0)。 const length = Number.parseInt(req.headers['content-length'] || '0', 10) const offset = file.offset const hasContentLengthSet = req.headers['content-length'] !== undefined const hasConfiguredMaxSizeSet = configuredMaxSize > 0 if (file.sizeIsDeferred) { // 对于延迟大小的上传,如果不是分块传输,则检查配置的最大大小。 if ( hasContentLengthSet && hasConfiguredMaxSizeSet && offset + length > configuredMaxSize ) { throw ERRORS.ERR_SIZE_EXCEEDED } if (hasConfiguredMaxSizeSet) { return configuredMaxSize - offset } return Number.MAX_SAFE_INTEGER } // 检查上传是否适合文件的大小(当大小不是延迟的时)。 if (offset + length > (file.size || 0)) { throw ERRORS.ERR_SIZE_EXCEEDED } if (hasContentLengthSet) { return length } return (file.size || 0) - offset } }