collect-system/packages/tus/src/lockers/MemoryLocker.ts

145 lines
4.3 KiB
TypeScript
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/**
* MemoryLocker 是一个实现了 Locker 接口的类,用于在内存中管理锁。
* 该类设计用于对资源进行独占访问控制,常用于上传管理等场景。
*
* 主要特性:
* - 通过使用基于内存的映射来跟踪锁,确保资源的独占访问。
* - 实现锁获取的超时机制,缓解死锁情况。
* - 通过不同的机制促进锁的立即和优雅释放。
*
* 锁定行为:
* - 当对已经锁定的资源调用 `lock` 方法时,会调用 `cancelReq` 回调。
* 这向当前锁持有者发出信号,表示另一个进程正在请求锁,鼓励他们尽快释放锁。
* - 锁尝试会持续到指定的超时时间。如果超时到期且锁仍然不可用,则抛出错误以指示锁获取失败。
*
* 锁的获取和释放:
* - `lock` 方法实现了等待机制,允许锁请求在锁可用时成功,或在超时期间失败。
* - `unlock` 方法释放锁,使资源可供其他请求使用。
*/
import { RequestRelease, Locker, ERRORS, Lock } from "../utils"
export interface MemoryLockerOptions {
acquireLockTimeout: number
}
interface LockEntry {
requestRelease: RequestRelease
}
export class MemoryLocker implements Locker {
timeout: number
locks = new Map<string, LockEntry>()
constructor(options?: MemoryLockerOptions) {
this.timeout = options?.acquireLockTimeout ?? 1000 * 30
}
/**
* 创建一个新的 MemoryLock 实例。
* @param id 锁的唯一标识符。
* @returns 返回一个新的 MemoryLock 实例。
*/
newLock(id: string) {
return new MemoryLock(id, this, this.timeout)
}
}
class MemoryLock implements Lock {
constructor(
private id: string,
private locker: MemoryLocker,
private timeout: number = 1000 * 30
) { }
/**
* 尝试获取锁。
* @param requestRelease 当锁被请求时调用的回调函数。
* @throws 如果锁获取超时,则抛出 ERRORS.ERR_LOCK_TIMEOUT 错误。
*/
async lock(requestRelease: RequestRelease): Promise<void> {
const abortController = new AbortController()
const lock = await Promise.race([
this.waitTimeout(abortController.signal),
this.acquireLock(this.id, requestRelease, abortController.signal),
])
abortController.abort()
if (!lock) {
throw ERRORS.ERR_LOCK_TIMEOUT
}
}
/**
* 尝试获取指定 ID 的锁。
* @param id 锁的唯一标识符。
* @param requestRelease 当锁被请求释放时调用的回调函数。
* @param signal 用于取消操作的 AbortSignal。
* @returns 如果成功获取锁,则返回 true否则返回 false。
*/
protected async acquireLock(
id: string,
requestRelease: RequestRelease,
signal: AbortSignal
): Promise<boolean> {
if (signal.aborted) {
return false
}
const lock = this.locker.locks.get(id)
if (!lock) {
const lock = {
requestRelease,
}
this.locker.locks.set(id, lock)
return true
}
await lock.requestRelease?.()
return await new Promise((resolve, reject) => {
// 使用 setImmediate 的原因:
// 1. 通过将递归调用推迟到下一个事件循环迭代来防止堆栈溢出。
// 2. 允许事件循环处理其他挂起的事件,保持服务器的响应性。
// 3. 通过给其他请求获取锁的机会,确保锁获取的公平性。
setImmediate(() => {
this.acquireLock(id, requestRelease, signal).then(resolve).catch(reject)
})
})
}
/**
* 释放锁。
* @throws 如果尝试释放未锁定的锁,则抛出错误。
*/
async unlock(): Promise<void> {
const lock = this.locker.locks.get(this.id)
if (!lock) {
throw new Error('Releasing an unlocked lock!')
}
this.locker.locks.delete(this.id)
}
/**
* 等待超时。
* @param signal 用于取消操作的 AbortSignal。
* @returns 如果超时,则返回 false。
*/
protected waitTimeout(signal: AbortSignal) {
return new Promise<boolean>((resolve) => {
const timeout = setTimeout(() => {
resolve(false)
}, this.timeout)
const abortListener = () => {
clearTimeout(timeout)
signal.removeEventListener('abort', abortListener)
resolve(false)
}
signal.addEventListener('abort', abortListener)
})
}
}