collect-system/packages/tus/src/handlers/BaseHandler.ts

364 lines
13 KiB
TypeScript
Raw Normal View History

2025-01-06 08:45:23 +08:00
import EventEmitter from 'node:events'
import stream from 'node:stream/promises'
import { addAbortSignal, PassThrough } from 'node:stream'
import type http from 'node:http'
import type { ServerOptions } from '../types'
import throttle from 'lodash.throttle'
import { CancellationContext, DataStore, ERRORS, EVENTS, StreamLimiter, Upload } from '../utils'
/**
* URL ID
* URL
* - `([^/]+)`
* - `\/?$`
*
* - `/files/12345` `12345`
* - `/files/12345/` `12345`
*/
const reExtractFileID = /([^/]+)\/?$/
/**
* HTTP `forwarded`
* `host="<value>"` `host=<value>` `<value>`
* - `host="?` `host=` `host="`
* - `([^";]+)`
*
* - `host="example.com"` `example.com`
* - `host=example.com` `example.com`
*/
const reForwardedHost = /host="?([^";]+)/
/**
* HTTP `forwarded` `http` `https`
* `proto=<value>` `<value>`
* - `proto=` `proto=`
* - `(https?)` `http` `https`
*
* - `proto=https` `https`
* - `proto=http` `http`
*/
const reForwardedProto = /proto=(https?)/
/**
* BaseHandler TUS
* Node.js EventEmitter
*/
export class BaseHandler extends EventEmitter {
options: ServerOptions
store: DataStore
/**
* BaseHandler
* @param store -
* @param options -
* @throws store
*/
constructor(store: DataStore, options: ServerOptions) {
super()
if (!store) {
throw new Error('Store must be defined')
}
this.store = store
this.options = options
}
/**
* HTTP
* @param res - HTTP
* @param status - HTTP
* @param headers -
* @param body -
* @returns
*/
write(res: http.ServerResponse, status: number, headers = {}, body = '') {
if (status !== 204) {
// @ts-expect-error not explicitly typed but possible
headers['Content-Length'] = Buffer.byteLength(body, 'utf8')
}
res.writeHead(status, headers)
res.write(body)
return res.end()
}
/**
* URL
* @param req - HTTP
* @param id - ID
* @returns URL
*/
generateUrl(req: http.IncomingMessage, id: string) {
const path = this.options.path === '/' ? '' : this.options.path
if (this.options.generateUrl) {
// 使用用户定义的 generateUrl 函数生成 URL
const { proto, host } = this.extractHostAndProto(req)
return this.options.generateUrl(req, {
proto,
host,
path: path,
id,
})
}
// 默认实现
if (this.options.relativeLocation) {
return `${path}/${id}`
}
const { proto, host } = this.extractHostAndProto(req)
return `${proto}://${host}${path}/${id}`
}
/**
* ID
* @param req - HTTP
* @returns ID undefined
*/
getFileIdFromRequest(req: http.IncomingMessage) {
const match = reExtractFileID.exec(req.url as string)
if (this.options.getFileIdFromRequest) {
const lastPath = match ? decodeURIComponent(match[1]) : undefined
return this.options.getFileIdFromRequest(req, lastPath)
}
if (!match || this.options.path.includes(match[1])) {
return
}
return decodeURIComponent(match[1])
}
/**
* HTTP
* respectForwardedHeaders
*
* 使http
*
* @param req - HTTP
* @returns
*/
protected extractHostAndProto(req: http.IncomingMessage) {
let proto: string | undefined
let host: string | undefined
// 如果启用了尊重转发头选项
if (this.options.respectForwardedHeaders) {
// 从请求头中获取 forwarded 字段
const forwarded = req.headers.forwarded as string | undefined
if (forwarded) {
// 使用正则表达式从 forwarded 字段中提取主机名和协议
host ??= reForwardedHost.exec(forwarded)?.[1]
proto ??= reForwardedProto.exec(forwarded)?.[1]
}
// 从请求头中获取 x-forwarded-host 和 x-forwarded-proto 字段
const forwardHost = req.headers['x-forwarded-host']
const forwardProto = req.headers['x-forwarded-proto']
// 检查 x-forwarded-proto 是否为有效的协议http 或 https
// @ts-expect-error we can pass undefined
if (['http', 'https'].includes(forwardProto)) {
proto ??= forwardProto as string
}
// 如果 x-forwarded-host 存在,则使用它作为主机名
host ??= forwardHost as string
}
// 如果未从转发头中获取到主机名,则使用请求头中的 host 字段
host ??= req.headers.host
// 如果未从转发头中获取到协议,则默认使用 http
proto ??= 'http'
// 返回包含主机名和协议的对象
return { host: host as string, proto }
}
/**
*
* @param req - HTTP
* @returns
*/
protected async getLocker(req: http.IncomingMessage) {
if (typeof this.options.locker === 'function') {
return this.options.locker(req)
}
return this.options.locker
}
/**
*
* @param req - HTTP
* @param id - ID
* @param context -
* @returns
*/
protected async acquireLock(
req: http.IncomingMessage,
id: string,
context: CancellationContext
) {
const locker = await this.getLocker(req)
const lock = locker.newLock(id)
await lock.lock(() => {
context.cancel()
})
return lock
}
/**
*
* HTTP
*
* @param req - HTTP
* @param upload - ID
* @param maxFileSize -
* @param context -
* @returns Promise
*/
protected writeToStore(
req: http.IncomingMessage,
upload: Upload,
maxFileSize: number,
context: CancellationContext
) {
// 使用 Promise 包装异步操作,以便更好地处理取消和错误。
// biome-ignore lint/suspicious/noAsyncPromiseExecutor: <explanation>
return new Promise<number>(async (resolve, reject) => {
// 检查是否已被取消,如果已取消则直接拒绝 Promise。
if (context.signal.aborted) {
reject(ERRORS.ABORTED)
return
}
// 创建一个 PassThrough 流作为代理,用于管理请求流。
// PassThrough 流是一个透明的流,它允许数据通过而不进行任何修改。
// 使用代理流的好处是可以在不影响原始请求流的情况下中止写入过程。
const proxy = new PassThrough()
// 将取消信号与代理流关联,以便在取消时自动中止流。
addAbortSignal(context.signal, proxy)
// 监听代理流的错误事件,处理流中的错误。
proxy.on('error', (err) => {
// 取消请求流与代理流的管道连接。
req.unpipe(proxy)
// 如果错误是 AbortError则返回 ABORTED 错误,否则返回原始错误。
reject(err.name === 'AbortError' ? ERRORS.ABORTED : err)
})
// 使用 throttle 函数创建一个节流函数,用于定期触发 POST_RECEIVE_V2 事件。
// 该事件用于通知上传进度,避免频繁触发事件导致性能问题。
const postReceive = throttle(
(offset: number) => {
// 触发 POST_RECEIVE_V2 事件,传递当前上传的偏移量。
this.emit(EVENTS.POST_RECEIVE_V2, req, { ...upload, offset })
},
// 设置节流的时间间隔,避免事件触发过于频繁。
this.options.postReceiveInterval,
{ leading: false }
)
// 临时变量,用于跟踪当前写入的偏移量。
let tempOffset = upload.offset
// 监听代理流的 data 事件,每当有数据块通过时更新偏移量并触发进度事件。
proxy.on('data', (chunk: Buffer) => {
tempOffset += chunk.byteLength
postReceive(tempOffset)
})
// 监听请求流的 error 事件,处理请求流中的错误。
req.on('error', () => {
// 如果代理流未关闭,则优雅地结束流,以便将剩余的字节作为 incompletePart 上传到存储。
if (!proxy.closed) {
proxy.end()
}
})
// 使用 stream.pipeline 将请求流通过代理流和 StreamLimiter 传输到存储系统。
// StreamLimiter 用于限制写入的数据量,确保不超过最大文件大小。
stream
.pipeline(
// 将请求流通过代理流传输。
req.pipe(proxy),
// 使用 StreamLimiter 限制写入的数据量。
new StreamLimiter(maxFileSize),
// 将数据流写入存储系统。
async (stream) => {
return this.store.write(stream as StreamLimiter, upload.id, upload.offset)
}
)
// 如果管道操作成功,则解析 Promise 并返回写入的字节数。
.then(resolve)
// 如果管道操作失败,则拒绝 Promise 并返回错误。
.catch(reject)
})
}
/**
*
* @param req - HTTP
* @param id - ID
* @returns
*/
getConfiguredMaxSize(req: http.IncomingMessage, id: string | null) {
if (typeof this.options.maxSize === 'function') {
return this.options.maxSize(req, id)
}
return this.options.maxSize ?? 0
}
/**
*
*
* @param req - HTTP
* @param file -
* @param configuredMaxSize -
* @returns
* @throws ERRORS.ERR_SIZE_EXCEEDED
*/
async calculateMaxBodySize(
req: http.IncomingMessage,
file: Upload,
configuredMaxSize?: number
) {
// 如果未明确提供,则使用服务器配置的最大大小。
configuredMaxSize ??= await this.getConfiguredMaxSize(req, file.id)
// 从请求中解析 Content-Length 头(如果未设置,则默认为 0
const length = Number.parseInt(req.headers['content-length'] || '0', 10)
const offset = file.offset
const hasContentLengthSet = req.headers['content-length'] !== undefined
const hasConfiguredMaxSizeSet = configuredMaxSize > 0
if (file.sizeIsDeferred) {
// 对于延迟大小的上传,如果不是分块传输,则检查配置的最大大小。
if (
hasContentLengthSet &&
hasConfiguredMaxSizeSet &&
offset + length > configuredMaxSize
) {
throw ERRORS.ERR_SIZE_EXCEEDED
}
if (hasConfiguredMaxSizeSet) {
return configuredMaxSize - offset
}
return Number.MAX_SAFE_INTEGER
}
// 检查上传是否适合文件的大小(当大小不是延迟的时)。
if (offset + length > (file.size || 0)) {
throw ERRORS.ERR_SIZE_EXCEEDED
}
if (hasContentLengthSet) {
return length
}
return (file.size || 0) - offset
}
}