This commit is contained in:
ditiqi 2025-03-02 16:45:36 +08:00
parent 8c0a478ae0
commit d0a5571b50
1 changed files with 456 additions and 389 deletions

View File

@ -1,452 +1,519 @@
import http from 'node:http' import http from "node:http";
import { EventEmitter } from 'node:events' import { EventEmitter } from "node:events";
import debug from 'debug' import debug from "debug";
import { GetHandler } from './handlers/GetHandler' import { GetHandler } from "./handlers/GetHandler";
import { HeadHandler } from './handlers/HeadHandler' import { HeadHandler } from "./handlers/HeadHandler";
import { OptionsHandler } from './handlers/OptionsHandler' import { OptionsHandler } from "./handlers/OptionsHandler";
import { PatchHandler } from './handlers/PatchHandler' import { PatchHandler } from "./handlers/PatchHandler";
import { PostHandler } from './handlers/PostHandler' import { PostHandler } from "./handlers/PostHandler";
import { DeleteHandler } from './handlers/DeleteHandler' import { DeleteHandler } from "./handlers/DeleteHandler";
import { validateHeader } from './validators/HeaderValidator' import { validateHeader } from "./validators/HeaderValidator";
import type stream from 'node:stream' import type stream from "node:stream";
import type { ServerOptions, RouteHandler, WithOptional } from './types' import type { ServerOptions, RouteHandler, WithOptional } from "./types";
import { MemoryLocker } from './lockers' import { MemoryLocker } from "./lockers";
import { EVENTS, Upload, DataStore, REQUEST_METHODS, ERRORS, TUS_RESUMABLE, EXPOSED_HEADERS, CancellationContext } from './utils' import {
import { message } from 'antd'; EVENTS,
Upload,
DataStore,
REQUEST_METHODS,
ERRORS,
TUS_RESUMABLE,
EXPOSED_HEADERS,
CancellationContext,
} from "./utils";
/** /**
* *
* TUS服务器支持的各种HTTP方法对应的处理器实例类型 * TUS服务器支持的各种HTTP方法对应的处理器实例类型
*/ */
type Handlers = { type Handlers = {
GET: InstanceType<typeof GetHandler> // GET请求处理器 GET: InstanceType<typeof GetHandler>; // GET请求处理器
HEAD: InstanceType<typeof HeadHandler> // HEAD请求处理器 HEAD: InstanceType<typeof HeadHandler>; // HEAD请求处理器
OPTIONS: InstanceType<typeof OptionsHandler> // OPTIONS请求处理器 OPTIONS: InstanceType<typeof OptionsHandler>; // OPTIONS请求处理器
PATCH: InstanceType<typeof PatchHandler> // PATCH请求处理器 PATCH: InstanceType<typeof PatchHandler>; // PATCH请求处理器
POST: InstanceType<typeof PostHandler> // POST请求处理器 POST: InstanceType<typeof PostHandler>; // POST请求处理器
DELETE: InstanceType<typeof DeleteHandler> // DELETE请求处理器 DELETE: InstanceType<typeof DeleteHandler>; // DELETE请求处理器
} };
/** /**
* TUS服务器事件接口定义 * TUS服务器事件接口定义
* *
*/ */
interface TusEvents { interface TusEvents {
/** /**
* *
* @param req HTTP请求对象 * @param req HTTP请求对象
* @param res HTTP响应对象 * @param res HTTP响应对象
* @param upload * @param upload
* @param url URL * @param url URL
*/ */
[EVENTS.POST_CREATE]: ( [EVENTS.POST_CREATE]: (
req: http.IncomingMessage, req: http.IncomingMessage,
res: http.ServerResponse, res: http.ServerResponse,
upload: Upload, upload: Upload,
url: string url: string
) => void ) => void;
/** /**
* @deprecated () * @deprecated ()
* 使 POST_RECEIVE_V2 * 使 POST_RECEIVE_V2
*/ */
[EVENTS.POST_RECEIVE]: ( [EVENTS.POST_RECEIVE]: (
req: http.IncomingMessage, req: http.IncomingMessage,
res: http.ServerResponse, res: http.ServerResponse,
upload: Upload upload: Upload
) => void ) => void;
/** /**
* V2版本 * V2版本
* @param req HTTP请求对象 * @param req HTTP请求对象
* @param upload * @param upload
*/ */
[EVENTS.POST_RECEIVE_V2]: (req: http.IncomingMessage, upload: Upload) => void [EVENTS.POST_RECEIVE_V2]: (
req: http.IncomingMessage,
upload: Upload
) => void;
/** /**
* *
* @param req HTTP请求对象 * @param req HTTP请求对象
* @param res HTTP响应对象 * @param res HTTP响应对象
* @param upload * @param upload
*/ */
[EVENTS.POST_FINISH]: ( [EVENTS.POST_FINISH]: (
req: http.IncomingMessage, req: http.IncomingMessage,
res: http.ServerResponse, res: http.ServerResponse,
upload: Upload upload: Upload
) => void ) => void;
/** /**
* *
* @param req HTTP请求对象 * @param req HTTP请求对象
* @param res HTTP响应对象 * @param res HTTP响应对象
* @param id * @param id
*/ */
[EVENTS.POST_TERMINATE]: ( [EVENTS.POST_TERMINATE]: (
req: http.IncomingMessage, req: http.IncomingMessage,
res: http.ServerResponse, res: http.ServerResponse,
id: string id: string
) => void ) => void;
} }
/** /**
* EventEmitter事件处理器类型别名 * EventEmitter事件处理器类型别名
*/ */
type on = EventEmitter['on'] type on = EventEmitter["on"];
type emit = EventEmitter['emit'] type emit = EventEmitter["emit"];
/** /**
* TUS服务器接口声明 * TUS服务器接口声明
* EventEmitter, * EventEmitter,
*/ */
export declare interface Server { export declare interface Server {
/** /**
* *
* @param event TusEvents的键之一 * @param event TusEvents的键之一
* @param listener * @param listener
* @returns Server实例以支持链式调用 * @returns Server实例以支持链式调用
*/ */
on<Event extends keyof TusEvents>(event: Event, listener: TusEvents[Event]): this on<Event extends keyof TusEvents>(
/** event: Event,
* listener: TusEvents[Event]
* @param eventName ): this;
* @param listener /**
* @returns Server实例以支持链式调用 *
*/ * @param eventName
on(eventName: Parameters<on>[0], listener: Parameters<on>[1]): this * @param listener
/** * @returns Server实例以支持链式调用
* */
* @param event TusEvents的键之一 on(eventName: Parameters<on>[0], listener: Parameters<on>[1]): this;
* @param listener /**
* @returns emit函数的返回值 *
*/ * @param event TusEvents的键之一
emit<Event extends keyof TusEvents>( * @param listener
event: Event, * @returns emit函数的返回值
listener: TusEvents[Event] */
): ReturnType<emit> emit<Event extends keyof TusEvents>(
/** event: Event,
* listener: TusEvents[Event]
* @param eventName ): ReturnType<emit>;
* @param listener /**
* @returns emit函数的返回值 *
*/ * @param eventName
emit(eventName: Parameters<emit>[0], listener: Parameters<emit>[1]): ReturnType<emit> * @param listener
* @returns emit函数的返回值
*/
emit(
eventName: Parameters<emit>[0],
listener: Parameters<emit>[1]
): ReturnType<emit>;
} }
/** /**
* *
*/ */
const log = debug('tus-node-server') const log = debug("tus-node-server");
// biome-ignore lint/suspicious/noUnsafeDeclarationMerging: it's fine // biome-ignore lint/suspicious/noUnsafeDeclarationMerging: it's fine
export class Server extends EventEmitter { export class Server extends EventEmitter {
datastore: DataStore datastore: DataStore;
handlers: Handlers handlers: Handlers;
options: ServerOptions options: ServerOptions;
/** /**
* Server * Server
* @param options - * @param options -
* @throws optionspath datastore * @throws optionspath datastore
*/ */
constructor(options: WithOptional<ServerOptions, 'locker'> & { datastore: DataStore }) { constructor(
super() options: WithOptional<ServerOptions, "locker"> & {
datastore: DataStore;
}
) {
super();
if (!options) { if (!options) {
throw new Error("'options' must be defined") throw new Error("'options' must be defined");
} }
if (!options.path) { if (!options.path) {
throw new Error("'path' is not defined; must have a path") throw new Error("'path' is not defined; must have a path");
} }
if (!options.datastore) { if (!options.datastore) {
throw new Error("'datastore' is not defined; must have a datastore") throw new Error(
} "'datastore' is not defined; must have a datastore"
);
}
if (!options.locker) { if (!options.locker) {
options.locker = new MemoryLocker() options.locker = new MemoryLocker();
} }
if (!options.lockDrainTimeout) { if (!options.lockDrainTimeout) {
options.lockDrainTimeout = 3000 options.lockDrainTimeout = 3000;
} }
if (!options.postReceiveInterval) { if (!options.postReceiveInterval) {
options.postReceiveInterval = 1000 options.postReceiveInterval = 1000;
} }
const { datastore, ...rest } = options const { datastore, ...rest } = options;
this.options = rest as ServerOptions this.options = rest as ServerOptions;
this.datastore = datastore this.datastore = datastore;
this.handlers = { this.handlers = {
// GET 请求处理器应在具体实现中编写 // GET 请求处理器应在具体实现中编写
GET: new GetHandler(this.datastore, this.options), GET: new GetHandler(this.datastore, this.options),
// 这些方法按照 tus 协议处理 // 这些方法按照 tus 协议处理
HEAD: new HeadHandler(this.datastore, this.options), HEAD: new HeadHandler(this.datastore, this.options),
OPTIONS: new OptionsHandler(this.datastore, this.options), OPTIONS: new OptionsHandler(this.datastore, this.options),
PATCH: new PatchHandler(this.datastore, this.options), PATCH: new PatchHandler(this.datastore, this.options),
POST: new PostHandler(this.datastore, this.options), POST: new PostHandler(this.datastore, this.options),
DELETE: new DeleteHandler(this.datastore, this.options), DELETE: new DeleteHandler(this.datastore, this.options),
} };
// 任何以方法为键分配给此对象的处理器将用于响应这些请求。 // 任何以方法为键分配给此对象的处理器将用于响应这些请求。
// 当数据存储分配给服务器时,它们会被设置/重置。 // 当数据存储分配给服务器时,它们会被设置/重置。
// 从服务器中移除任何事件监听器时,必须先从每个处理器中移除监听器。 // 从服务器中移除任何事件监听器时,必须先从每个处理器中移除监听器。
// 这必须在添加 'newListener' 监听器之前完成,以避免为所有请求处理器添加 'removeListener' 事件监听器。 // 这必须在添加 'newListener' 监听器之前完成,以避免为所有请求处理器添加 'removeListener' 事件监听器。
this.on('removeListener', (event: string, listener) => { this.on("removeListener", (event: string, listener) => {
this.datastore.removeListener(event, listener) this.datastore.removeListener(event, listener);
for (const method of REQUEST_METHODS) { for (const method of REQUEST_METHODS) {
this.handlers[method].removeListener(event, listener) this.handlers[method].removeListener(event, listener);
} }
}) });
// 当事件监听器被添加到服务器时,确保它们从请求处理器冒泡到服务器级别。 // 当事件监听器被添加到服务器时,确保它们从请求处理器冒泡到服务器级别。
this.on('newListener', (event: string, listener) => { this.on("newListener", (event: string, listener) => {
this.datastore.on(event, listener) this.datastore.on(event, listener);
for (const method of REQUEST_METHODS) { for (const method of REQUEST_METHODS) {
this.handlers[method].on(event, listener) this.handlers[method].on(event, listener);
} }
}) });
} }
/** /**
* GET * GET
* @param path - * @param path -
* @param handler - * @param handler -
*/ */
get(path: string, handler: RouteHandler) { get(path: string, handler: RouteHandler) {
this.handlers.GET.registerPath(path, handler) this.handlers.GET.registerPath(path, handler);
} }
/** /**
* 'request' * 'request'
* @param req - HTTP * @param req - HTTP
* @param res - HTTP * @param res - HTTP
* @returns HTTP * @returns HTTP
*/ */
async handle( async handle(
req: http.IncomingMessage, req: http.IncomingMessage,
res: http.ServerResponse res: http.ServerResponse
// biome-ignore lint/suspicious/noConfusingVoidType: it's fine // biome-ignore lint/suspicious/noConfusingVoidType: it's fine
): Promise<http.ServerResponse | stream.Writable | void> { ): Promise<http.ServerResponse | stream.Writable | void> {
const context = this.createContext(req);
log(`[TusServer] handle: ${req.method} ${req.url}`);
// 允许覆盖 HTTP 方法。这样做的原因是某些库/环境不支持 PATCH 和 DELETE 请求,例如浏览器中的 Flash 和 Java 部分环境
if (req.headers["x-http-method-override"]) {
req.method = (
req.headers["x-http-method-override"] as string
).toUpperCase();
}
const onError = async (error: {
status_code?: number;
body?: string;
message: string;
}) => {
let status_code =
error.status_code || ERRORS.UNKNOWN_ERROR.status_code;
let body =
error.body ||
`${ERRORS.UNKNOWN_ERROR.body}${error.message || ""}\n`;
if (this.options.onResponseError) {
const errorMapping = await this.options.onResponseError(
req,
res,
error as Error
);
if (errorMapping) {
status_code = errorMapping.status_code;
body = errorMapping.body;
}
}
return this.write(context, req, res, status_code, body);
};
if (req.method === "GET") {
const handler = this.handlers.GET;
return handler.send(req, res).catch(onError);
}
const context = this.createContext(req) // Tus-Resumable 头部必须包含在每个请求和响应中,除了 OPTIONS 请求。其值必须是客户端或服务器使用的协议版本。
log(`[TusServer] handle: ${req.method} ${req.url}`) res.setHeader("Tus-Resumable", TUS_RESUMABLE);
// 允许覆盖 HTTP 方法。这样做的原因是某些库/环境不支持 PATCH 和 DELETE 请求,例如浏览器中的 Flash 和 Java 部分环境 if (
if (req.headers['x-http-method-override']) { req.method !== "OPTIONS" &&
req.method = (req.headers['x-http-method-override'] as string).toUpperCase() req.headers["tus-resumable"] === undefined
} ) {
const onError = async (error: { return this.write(
status_code?: number context,
body?: string req,
message: string res,
}) => { 412,
"Tus-Resumable Required\n"
);
}
// 验证所有必需的头部以符合 tus 协议
const invalid_headers = [];
for (const header_name in req.headers) {
if (req.method === "OPTIONS") {
continue;
}
// 内容类型仅对 PATCH 请求进行检查。对于所有其他请求方法,它将被忽略并视为未设置内容类型,
// 因为某些 HTTP 客户端可能会为此头部强制执行默认值。
// 参见 https://github.com/tus/tus-node-server/pull/116
if (
header_name.toLowerCase() === "content-type" &&
req.method !== "PATCH"
) {
continue;
}
if (
!validateHeader(
header_name,
req.headers[header_name] as string | undefined
)
) {
log(
`Invalid ${header_name} header: ${req.headers[header_name]}`
);
invalid_headers.push(header_name);
}
}
let status_code = error.status_code || ERRORS.UNKNOWN_ERROR.status_code if (invalid_headers.length > 0) {
let body = error.body || `${ERRORS.UNKNOWN_ERROR.body}${error.message || ''}\n` return this.write(
if (this.options.onResponseError) { context,
const errorMapping = await this.options.onResponseError(req, res, error as Error) req,
if (errorMapping) { res,
status_code = errorMapping.status_code 400,
body = errorMapping.body `Invalid ${invalid_headers.join(" ")}\n`
} );
} }
return this.write(context, req, res, status_code, body) // 启用 CORS
} res.setHeader("Access-Control-Allow-Origin", this.getCorsOrigin(req));
if (req.method === 'GET') { res.setHeader("Access-Control-Expose-Headers", EXPOSED_HEADERS);
const handler = this.handlers.GET if (this.options.allowedCredentials === true) {
return handler.send(req, res).catch(onError) res.setHeader("Access-Control-Allow-Credentials", "true");
} }
// Tus-Resumable 头部必须包含在每个请求和响应中,除了 OPTIONS 请求。其值必须是客户端或服务器使用的协议版本。 // 调用请求方法的处理器
res.setHeader('Tus-Resumable', TUS_RESUMABLE) const handler = this.handlers[req.method as keyof Handlers];
if (req.method !== 'OPTIONS' && req.headers['tus-resumable'] === undefined) { if (handler) {
return this.write(context, req, res, 412, 'Tus-Resumable Required\n') return handler.send(req, res, context).catch(onError);
} }
// 验证所有必需的头部以符合 tus 协议
const invalid_headers = []
for (const header_name in req.headers) {
if (req.method === 'OPTIONS') {
continue
}
// 内容类型仅对 PATCH 请求进行检查。对于所有其他请求方法,它将被忽略并视为未设置内容类型,
// 因为某些 HTTP 客户端可能会为此头部强制执行默认值。
// 参见 https://github.com/tus/tus-node-server/pull/116
if (header_name.toLowerCase() === 'content-type' && req.method !== 'PATCH') {
continue
}
if (!validateHeader(header_name, req.headers[header_name] as string | undefined)) {
log(`Invalid ${header_name} header: ${req.headers[header_name]}`)
invalid_headers.push(header_name)
}
}
if (invalid_headers.length > 0) { return this.write(context, req, res, 404, "Not found\n");
return this.write(context, req, res, 400, `Invalid ${invalid_headers.join(' ')}\n`) }
}
// 启用 CORS
res.setHeader('Access-Control-Allow-Origin', this.getCorsOrigin(req))
res.setHeader('Access-Control-Expose-Headers', EXPOSED_HEADERS)
if (this.options.allowedCredentials === true) {
res.setHeader('Access-Control-Allow-Credentials', 'true')
}
// 调用请求方法的处理器 /**
const handler = this.handlers[req.method as keyof Handlers] * CORS
if (handler) { *
* CORS源地址`origin`
* `origin`
* `*`
*
* @param req HTTP请求对象
* @returns CORS源地址`origin``*`
*
*
* - CORS策略的灵活性
* - `*`CORS配置
*/
private getCorsOrigin(req: http.IncomingMessage): string {
const origin = req.headers.origin;
// 检查请求头中的`origin`是否在允许的源列表中
const isOriginAllowed =
this.options.allowedOrigins?.some(
(allowedOrigin) => allowedOrigin === origin
) ?? true;
// 如果`origin`存在且在允许的源列表中,则返回该`origin`
if (origin && isOriginAllowed) {
return origin;
}
// 如果允许的源列表不为空,则返回列表中的第一个源地址
if (
this.options.allowedOrigins &&
this.options.allowedOrigins.length > 0
) {
return this.options.allowedOrigins[0];
}
return handler.send(req, res, context).catch(onError) // 如果允许的源列表为空,则返回通配符`*`,表示允许所有源地址
} return "*";
}
return this.write(context, req, res, 404, 'Not found\n') /**
} *
* @param context -
* @param req - HTTP
* @param res - HTTP
* @param status - HTTP
* @param body -
* @param headers -
* @returns HTTP
*/
write(
context: CancellationContext,
req: http.IncomingMessage,
res: http.ServerResponse,
status: number,
body = "",
headers = {}
) {
const isAborted = context.signal.aborted;
/** if (status !== 204) {
* CORS // @ts-expect-error not explicitly typed but possible
* headers["Content-Length"] = Buffer.byteLength(body, "utf8");
* CORS源地址`origin` }
* `origin`
* `*`
*
* @param req HTTP请求对象
* @returns CORS源地址`origin``*`
*
*
* - CORS策略的灵活性
* - `*`CORS配置
*/
private getCorsOrigin(req: http.IncomingMessage): string {
const origin = req.headers.origin
// 检查请求头中的`origin`是否在允许的源列表中
const isOriginAllowed =
this.options.allowedOrigins?.some((allowedOrigin) => allowedOrigin === origin) ??
true
// 如果`origin`存在且在允许的源列表中,则返回该`origin`
if (origin && isOriginAllowed) {
return origin
}
// 如果允许的源列表不为空,则返回列表中的第一个源地址
if (this.options.allowedOrigins && this.options.allowedOrigins.length > 0) {
return this.options.allowedOrigins[0]
}
// 如果允许的源列表为空,则返回通配符`*`,表示允许所有源地址 if (isAborted) {
return '*' // 此条件处理请求被标记为中止的情况。
} // 在这种情况下,服务器通知客户端连接将被关闭。
// 这是通过在响应中设置 'Connection' 头部为 'close' 来传达的。
// 这一步对于防止服务器继续处理不再需要的请求至关重要,从而节省资源。
/** // @ts-expect-error not explicitly typed but possible
* headers.Connection = "close";
* @param context -
* @param req - HTTP
* @param res - HTTP
* @param status - HTTP
* @param body -
* @param headers -
* @returns HTTP
*/
write(
context: CancellationContext,
req: http.IncomingMessage,
res: http.ServerResponse,
status: number,
body = '',
headers = {}
) {
const isAborted = context.signal.aborted
if (status !== 204) { // 为响应 ('res') 添加 'finish' 事件的事件监听器。
// @ts-expect-error not explicitly typed but possible // 'finish' 事件在响应已发送给客户端时触发。
headers['Content-Length'] = Buffer.byteLength(body, 'utf8') // 一旦响应完成,请求 ('req') 对象将被销毁。
} // 销毁请求对象是释放与此请求相关的任何资源的关键步骤,因为它已经被中止。
res.on("finish", () => {
req.destroy();
});
}
if (isAborted) { res.writeHead(status, headers);
// 此条件处理请求被标记为中止的情况。 res.write(body);
// 在这种情况下,服务器通知客户端连接将被关闭。 return res.end();
// 这是通过在响应中设置 'Connection' 头部为 'close' 来传达的。 }
// 这一步对于防止服务器继续处理不再需要的请求至关重要,从而节省资源。
// @ts-expect-error not explicitly typed but possible /**
headers.Connection = 'close' *
* @param args -
* @returns HTTP
*/
// biome-ignore lint/suspicious/noExplicitAny: todo
listen(...args: any[]): http.Server {
return http.createServer(this.handle.bind(this)).listen(...args);
}
// 为响应 ('res') 添加 'finish' 事件的事件监听器。 /**
// 'finish' 事件在响应已发送给客户端时触发。 *
// 一旦响应完成,请求 ('req') 对象将被销毁。 * @returns
// 销毁请求对象是释放与此请求相关的任何资源的关键步骤,因为它已经被中止。 * @throws
res.on('finish', () => { */
req.destroy() cleanUpExpiredUploads(): Promise<number> {
}) if (!this.datastore.hasExtension("expiration")) {
} throw ERRORS.UNSUPPORTED_EXPIRATION_EXTENSION;
}
res.writeHead(status, headers) return this.datastore.deleteExpired();
res.write(body) }
return res.end()
}
/** /**
* *
* @param args - * @param req - HTTP
* @returns HTTP * @returns
*/ */
// biome-ignore lint/suspicious/noExplicitAny: todo protected createContext(req: http.IncomingMessage) {
listen(...args: any[]): http.Server { // 初始化两个 AbortController
return http.createServer(this.handle.bind(this)).listen(...args) // 1. `requestAbortController` 用于即时请求终止,特别适用于在发生错误时停止客户端上传。
} // 2. `abortWithDelayController` 用于在终止前引入延迟,允许服务器有时间完成正在进行的操作。
// 这在未来的请求可能需要获取当前请求持有的锁时特别有用。
const requestAbortController = new AbortController();
const abortWithDelayController = new AbortController();
/** // 当 `abortWithDelayController` 被触发时调用此函数,以在指定延迟后中止请求。
* const onDelayedAbort = (err: unknown) => {
* @returns abortWithDelayController.signal.removeEventListener(
* @throws "abort",
*/ onDelayedAbort
cleanUpExpiredUploads(): Promise<number> { );
if (!this.datastore.hasExtension('expiration')) { setTimeout(() => {
throw ERRORS.UNSUPPORTED_EXPIRATION_EXTENSION requestAbortController.abort(err);
} }, this.options.lockDrainTimeout);
};
abortWithDelayController.signal.addEventListener(
"abort",
onDelayedAbort
);
return this.datastore.deleteExpired() // 当请求关闭时,移除监听器以避免内存泄漏。
} req.on("close", () => {
abortWithDelayController.signal.removeEventListener(
"abort",
onDelayedAbort
);
});
/** // 返回一个对象,包含信号和两个中止请求的方法。
* // `signal` 用于监听请求中止事件。
* @param req - HTTP // `abort` 方法用于立即中止请求。
* @returns // `cancel` 方法用于启动延迟中止序列。
*/ return {
protected createContext(req: http.IncomingMessage) { signal: requestAbortController.signal,
// 初始化两个 AbortController abort: () => {
// 1. `requestAbortController` 用于即时请求终止,特别适用于在发生错误时停止客户端上传。 // 立即中止请求
// 2. `abortWithDelayController` 用于在终止前引入延迟,允许服务器有时间完成正在进行的操作。 if (!requestAbortController.signal.aborted) {
// 这在未来的请求可能需要获取当前请求持有的锁时特别有用。 requestAbortController.abort(ERRORS.ABORTED);
const requestAbortController = new AbortController() }
const abortWithDelayController = new AbortController() },
cancel: () => {
// 当 `abortWithDelayController` 被触发时调用此函数,以在指定延迟后中止请求。 // 启动延迟中止序列,除非它已经在进行中。
const onDelayedAbort = (err: unknown) => { if (!abortWithDelayController.signal.aborted) {
abortWithDelayController.signal.removeEventListener('abort', onDelayedAbort) abortWithDelayController.abort(ERRORS.ABORTED);
setTimeout(() => { }
requestAbortController.abort(err) },
}, this.options.lockDrainTimeout) };
} }
abortWithDelayController.signal.addEventListener('abort', onDelayedAbort) }
// 当请求关闭时,移除监听器以避免内存泄漏。
req.on('close', () => {
abortWithDelayController.signal.removeEventListener('abort', onDelayedAbort)
})
// 返回一个对象,包含信号和两个中止请求的方法。
// `signal` 用于监听请求中止事件。
// `abort` 方法用于立即中止请求。
// `cancel` 方法用于启动延迟中止序列。
return {
signal: requestAbortController.signal,
abort: () => {
// 立即中止请求
if (!requestAbortController.signal.aborted) {
requestAbortController.abort(ERRORS.ABORTED)
}
},
cancel: () => {
// 启动延迟中止序列,除非它已经在进行中。
if (!abortWithDelayController.signal.aborted) {
abortWithDelayController.abort(ERRORS.ABORTED)
}
},
}
}
}