fenghuo/packages/storage/src/tus/server.ts

434 lines
15 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import http from 'node:http';
import { EventEmitter } from 'node:events';
import debug from 'debug';
import { GetHandler } from './handlers/GetHandler';
import { HeadHandler } from './handlers/HeadHandler';
import { OptionsHandler } from './handlers/OptionsHandler';
import { PatchHandler } from './handlers/PatchHandler';
import { PostHandler } from './handlers/PostHandler';
import { DeleteHandler } from './handlers/DeleteHandler';
import { validateHeader } from './validators/HeaderValidator';
import type stream from 'node:stream';
import type { ServerOptions, RouteHandler, WithOptional } from './types';
import { MemoryLocker } from './lockers';
import {
EVENTS,
Upload,
DataStore,
REQUEST_METHODS,
ERRORS,
TUS_RESUMABLE,
EXPOSED_HEADERS,
CancellationContext,
} from './utils';
/**
* 处理器类型映射
* 定义了TUS服务器支持的各种HTTP方法对应的处理器实例类型
*/
type Handlers = {
GET: InstanceType<typeof GetHandler>; // GET请求处理器
HEAD: InstanceType<typeof HeadHandler>; // HEAD请求处理器
OPTIONS: InstanceType<typeof OptionsHandler>; // OPTIONS请求处理器
PATCH: InstanceType<typeof PatchHandler>; // PATCH请求处理器
POST: InstanceType<typeof PostHandler>; // POST请求处理器
DELETE: InstanceType<typeof DeleteHandler>; // DELETE请求处理器
};
/**
* TUS服务器事件接口定义
* 描述了服务器在不同阶段触发的事件及其处理函数签名
*/
interface TusEvents {
/**
* 文件创建后触发
* @param req HTTP请求对象
* @param res HTTP响应对象
* @param upload 上传对象实例
* @param url 生成的文件URL
*/
[EVENTS.POST_CREATE]: (req: http.IncomingMessage, res: http.ServerResponse, upload: Upload, url: string) => void;
/**
* @deprecated 文件接收事件(已废弃)
* 建议使用 POST_RECEIVE_V2 替代
*/
[EVENTS.POST_RECEIVE]: (req: http.IncomingMessage, res: http.ServerResponse, upload: Upload) => void;
/**
* 文件接收事件V2版本
* @param req HTTP请求对象
* @param upload 上传对象实例
*/
[EVENTS.POST_RECEIVE_V2]: (req: http.IncomingMessage, upload: Upload) => void;
/**
* 文件上传完成事件
* @param req HTTP请求对象
* @param res HTTP响应对象
* @param upload 上传对象实例
*/
[EVENTS.POST_FINISH]: (req: http.IncomingMessage, res: http.ServerResponse, upload: Upload) => void;
/**
* 文件终止上传事件
* @param req HTTP请求对象
* @param res HTTP响应对象
* @param id 文件唯一标识符
*/
[EVENTS.POST_TERMINATE]: (req: http.IncomingMessage, res: http.ServerResponse, id: string) => void;
}
/**
* EventEmitter事件处理器类型别名
*/
type on = EventEmitter['on'];
type emit = EventEmitter['emit'];
/**
* TUS服务器接口声明
* 继承EventEmitter,支持事件监听和触发
*/
export declare interface Server {
/**
* 为指定事件注册监听器
* @param event 事件名称必须是TusEvents的键之一
* @param listener 事件触发时执行的回调函数
* @returns 返回Server实例以支持链式调用
*/
on<Event extends keyof TusEvents>(event: Event, listener: TusEvents[Event]): this;
/**
* 为指定事件注册监听器(通用版本)
* @param eventName 事件名称
* @param listener 事件触发时执行的回调函数
* @returns 返回Server实例以支持链式调用
*/
on(eventName: Parameters<on>[0], listener: Parameters<on>[1]): this;
/**
* 触发指定事件
* @param event 事件名称必须是TusEvents的键之一
* @param listener 事件触发时执行的回调函数
* @returns 返回emit函数的返回值
*/
emit<Event extends keyof TusEvents>(event: Event, listener: TusEvents[Event]): ReturnType<emit>;
/**
* 触发指定事件(通用版本)
* @param eventName 事件名称
* @param listener 事件触发时执行的回调函数
* @returns 返回emit函数的返回值
*/
emit(eventName: Parameters<emit>[0], listener: Parameters<emit>[1]): ReturnType<emit>;
}
/**
* 调试日志工具实例
*/
const log = debug('tus-node-server');
// biome-ignore lint/suspicious/noUnsafeDeclarationMerging: it's fine
export class Server extends EventEmitter {
datastore: DataStore;
handlers: Handlers;
options: ServerOptions;
/**
* Server 构造函数
* @param options - 服务器配置选项,包含数据存储和可选配置
* @throws 如果未提供 options、path 或 datastore将抛出错误
*/
constructor(
options: WithOptional<ServerOptions, 'locker'> & {
datastore: DataStore;
},
) {
super();
if (!options) {
throw new Error("'options' must be defined");
}
if (!options.path) {
throw new Error("'path' is not defined; must have a path");
}
if (!options.datastore) {
throw new Error("'datastore' is not defined; must have a datastore");
}
if (!options.locker) {
options.locker = new MemoryLocker();
}
if (!options.lockDrainTimeout) {
options.lockDrainTimeout = 3000;
}
if (!options.postReceiveInterval) {
options.postReceiveInterval = 1000;
}
const { datastore, ...rest } = options;
this.options = rest as ServerOptions;
this.datastore = datastore;
this.handlers = {
// GET 请求处理器应在具体实现中编写
GET: new GetHandler(this.datastore, this.options),
// 这些方法按照 tus 协议处理
HEAD: new HeadHandler(this.datastore, this.options),
OPTIONS: new OptionsHandler(this.datastore, this.options),
PATCH: new PatchHandler(this.datastore, this.options),
POST: new PostHandler(this.datastore, this.options),
DELETE: new DeleteHandler(this.datastore, this.options),
};
// 任何以方法为键分配给此对象的处理器将用于响应这些请求。
// 当数据存储分配给服务器时,它们会被设置/重置。
// 从服务器中移除任何事件监听器时,必须先从每个处理器中移除监听器。
// 这必须在添加 'newListener' 监听器之前完成,以避免为所有请求处理器添加 'removeListener' 事件监听器。
this.on('removeListener', (event: string, listener) => {
this.datastore.removeListener(event, listener);
for (const method of REQUEST_METHODS) {
this.handlers[method].removeListener(event, listener);
}
});
// 当事件监听器被添加到服务器时,确保它们从请求处理器冒泡到服务器级别。
this.on('newListener', (event: string, listener) => {
this.datastore.on(event, listener);
for (const method of REQUEST_METHODS) {
this.handlers[method].on(event, listener);
}
});
}
/**
* 注册 GET 请求处理器
* @param path - 请求路径
* @param handler - 请求处理器
*/
get(path: string, handler: RouteHandler) {
this.handlers.GET.registerPath(path, handler);
}
/**
* 主服务器请求监听器,在每个 'request' 事件上调用
* @param req - HTTP 请求对象
* @param res - HTTP 响应对象
* @returns 返回 HTTP 响应对象或流
*/
async handle(
req: http.IncomingMessage,
res: http.ServerResponse,
// biome-ignore lint/suspicious/noConfusingVoidType: it's fine
): Promise<http.ServerResponse | stream.Writable | void> {
const context = this.createContext(req);
log(`[TusServer] handle: ${req.method} ${req.url}`);
// 允许覆盖 HTTP 方法。这样做的原因是某些库/环境不支持 PATCH 和 DELETE 请求,例如浏览器中的 Flash 和 Java 部分环境
if (req.headers['x-http-method-override']) {
req.method = (req.headers['x-http-method-override'] as string).toUpperCase();
}
const onError = async (error: { status_code?: number; body?: string; message: string }) => {
let status_code = error.status_code || ERRORS.UNKNOWN_ERROR.status_code;
let body = error.body || `${ERRORS.UNKNOWN_ERROR.body}${error.message || ''}\n`;
if (this.options.onResponseError) {
const errorMapping = await this.options.onResponseError(req, res, error as Error);
if (errorMapping) {
status_code = errorMapping.status_code;
body = errorMapping.body;
}
}
return this.write(context, req, res, status_code, body);
};
if (req.method === 'GET') {
const handler = this.handlers.GET;
return handler.send(req, res).catch(onError);
}
// Tus-Resumable 头部必须包含在每个请求和响应中,除了 OPTIONS 请求。其值必须是客户端或服务器使用的协议版本。
res.setHeader('Tus-Resumable', TUS_RESUMABLE);
if (req.method !== 'OPTIONS' && req.headers['tus-resumable'] === undefined) {
return this.write(context, req, res, 412, 'Tus-Resumable Required\n');
}
// 验证所有必需的头部以符合 tus 协议
const invalid_headers: string[] = [];
for (const header_name in req.headers) {
if (req.method === 'OPTIONS') {
continue;
}
// 内容类型仅对 PATCH 请求进行检查。对于所有其他请求方法,它将被忽略并视为未设置内容类型,
// 因为某些 HTTP 客户端可能会为此头部强制执行默认值。
// 参见 https://github.com/tus/tus-node-server/pull/116
if (header_name.toLowerCase() === 'content-type' && req.method !== 'PATCH') {
continue;
}
if (!validateHeader(header_name, req.headers[header_name] as string | undefined)) {
log(`Invalid ${header_name} header: ${req.headers[header_name]}`);
invalid_headers.push(header_name);
}
}
if (invalid_headers.length > 0) {
return this.write(context, req, res, 400, `Invalid ${invalid_headers.join(' ')}\n`);
}
// 启用 CORS
res.setHeader('Access-Control-Allow-Origin', this.getCorsOrigin(req));
res.setHeader('Access-Control-Expose-Headers', EXPOSED_HEADERS);
if (this.options.allowedCredentials === true) {
res.setHeader('Access-Control-Allow-Credentials', 'true');
}
// 调用请求方法的处理器
const handler = this.handlers[req.method as keyof Handlers];
if (handler) {
return handler.send(req, res, context).catch(onError);
}
return this.write(context, req, res, 404, 'Not found\n');
}
/**
* 获取CORS跨域资源共享允许的源地址
*
* 该方法用于确定并返回允许的CORS源地址。首先检查请求头中的`origin`是否在允许的源列表中,
* 如果在则返回该`origin`;如果不在但允许的源列表不为空,则返回列表中的第一个源地址;
* 如果允许的源列表为空,则返回通配符`*`,表示允许所有源地址。
*
* @param req HTTP请求对象包含请求头等信息
* @returns 返回允许的CORS源地址可能是请求头中的`origin`、允许的源列表中的第一个源地址或通配符`*`
*
* 设计考量:
* - 该方法考虑了CORS策略的灵活性允许通过配置动态指定允许的源地址。
* - 通过返回通配符`*`简化了默认情况下的CORS配置但需要注意这可能带来安全风险。
*/
private getCorsOrigin(req: http.IncomingMessage): string {
const origin = req.headers.origin;
// 检查请求头中的`origin`是否在允许的源列表中
const isOriginAllowed = this.options.allowedOrigins?.some((allowedOrigin) => allowedOrigin === origin) ?? true;
// 如果`origin`存在且在允许的源列表中,则返回该`origin`
if (origin && isOriginAllowed) {
return origin;
}
// 如果允许的源列表不为空,则返回列表中的第一个源地址
if (this.options.allowedOrigins && this.options.allowedOrigins.length > 0) {
return this.options.allowedOrigins[0]!;
}
// 如果允许的源列表为空,则返回通配符`*`,表示允许所有源地址
return '*';
}
/**
* 写入响应
* @param context - 取消上下文
* @param req - HTTP 请求对象
* @param res - HTTP 响应对象
* @param status - HTTP 状态码
* @param body - 响应体
* @param headers - 响应头部
* @returns 返回 HTTP 响应对象
*/
write(
context: CancellationContext,
req: http.IncomingMessage,
res: http.ServerResponse,
status: number,
body = '',
headers: Record<string, string | number> = {},
) {
const isAborted = context.signal.aborted;
if (status !== 204) {
(headers as any)['Content-Length'] = Buffer.byteLength(body, 'utf8');
}
if (isAborted) {
// 此条件处理请求被标记为中止的情况。
// 在这种情况下,服务器通知客户端连接将被关闭。
// 这是通过在响应中设置 'Connection' 头部为 'close' 来传达的。
// 这一步对于防止服务器继续处理不再需要的请求至关重要,从而节省资源。
(headers as any).Connection = 'close';
// 为响应 ('res') 添加 'finish' 事件的事件监听器。
// 'finish' 事件在响应已发送给客户端时触发。
// 一旦响应完成,请求 ('req') 对象将被销毁。
// 销毁请求对象是释放与此请求相关的任何资源的关键步骤,因为它已经被中止。
res.on('finish', () => {
req.destroy();
});
}
res.writeHead(status, headers);
res.write(body);
return res.end();
}
/**
* 启动服务器监听
* @param args - 监听参数
* @returns 返回 HTTP 服务器实例
*/
// biome-ignore lint/suspicious/noExplicitAny: todo
listen(...args: any[]): http.Server {
return http.createServer(this.handle.bind(this)).listen(...args);
}
/**
* 清理过期的上传
* @returns 返回删除的过期上传数量
* @throws 如果数据存储不支持过期扩展,将抛出错误
*/
cleanUpExpiredUploads(): Promise<number> {
if (!this.datastore.hasExtension('expiration')) {
throw ERRORS.UNSUPPORTED_EXPIRATION_EXTENSION;
}
return this.datastore.deleteExpired();
}
/**
* 创建取消上下文
* @param req - HTTP 请求对象
* @returns 返回取消上下文
*/
protected createContext(req: http.IncomingMessage) {
// 初始化两个 AbortController
// 1. `requestAbortController` 用于即时请求终止,特别适用于在发生错误时停止客户端上传。
// 2. `abortWithDelayController` 用于在终止前引入延迟,允许服务器有时间完成正在进行的操作。
// 这在未来的请求可能需要获取当前请求持有的锁时特别有用。
const requestAbortController = new AbortController();
const abortWithDelayController = new AbortController();
// 当 `abortWithDelayController` 被触发时调用此函数,以在指定延迟后中止请求。
const onDelayedAbort = (err: unknown) => {
abortWithDelayController.signal.removeEventListener('abort', onDelayedAbort);
setTimeout(() => {
requestAbortController.abort(err);
}, this.options.lockDrainTimeout);
};
abortWithDelayController.signal.addEventListener('abort', onDelayedAbort);
// 当请求关闭时,移除监听器以避免内存泄漏。
req.on('close', () => {
abortWithDelayController.signal.removeEventListener('abort', onDelayedAbort);
});
// 返回一个对象,包含信号和两个中止请求的方法。
// `signal` 用于监听请求中止事件。
// `abort` 方法用于立即中止请求。
// `cancel` 方法用于启动延迟中止序列。
return {
signal: requestAbortController.signal,
abort: () => {
// 立即中止请求
if (!requestAbortController.signal.aborted) {
requestAbortController.abort(ERRORS.ABORTED);
}
},
cancel: () => {
// 启动延迟中止序列,除非它已经在进行中。
if (!abortWithDelayController.signal.aborted) {
abortWithDelayController.abort(ERRORS.ABORTED);
}
},
};
}
}