fenghuo/packages/oidc-provider/src/schemas/index.ts

111 lines
4.0 KiB
TypeScript
Raw Normal View History

2025-05-29 12:23:29 +08:00
import { z } from 'zod';
export const codeChallengeMethods = z.enum(['plain', 'S256']);
// 授权请求Schema
export const authorizationRequestSchema = z.object({
response_type: z.string().min(1, '响应类型不能为空'),
client_id: z.string().min(1, '客户端ID不能为空'),
redirect_uri: z.string().url('重定向URI必须是有效的URL'),
scope: z.string().min(1, '作用域不能为空'),
state: z.string().optional(),
nonce: z.string().optional(),
code_challenge: z.string().optional(),
code_challenge_method: codeChallengeMethods.optional(),
prompt: z.string().optional(),
max_age: z.number().int().positive().optional(),
id_token_hint: z.string().optional(),
login_hint: z.string().optional(),
acr_values: z.string().optional(),
}).strict();
// 令牌请求Schema
export const tokenRequestSchema = z.object({
grant_type: z.string().min(1, '授权类型不能为空'),
code: z.string().optional(),
redirect_uri: z.string().url('重定向URI必须是有效的URL').optional().or(z.literal('')),
client_id: z.string().min(1, '客户端ID不能为空'),
client_secret: z.string().optional(),
refresh_token: z.string().optional(),
scope: z.string().optional(),
code_verifier: z.string().optional(),
}).strict();
// 令牌撤销请求Schema
export const revokeTokenRequestSchema = z.object({
token: z.string().min(1, '令牌不能为空'),
token_type_hint: z.enum(['access_token', 'refresh_token']).optional(),
client_id: z.string().optional(),
}).strict();
// 令牌内省请求Schema
export const introspectTokenRequestSchema = z.object({
token: z.string().min(1, '令牌不能为空'),
token_type_hint: z.enum(['access_token', 'refresh_token']).optional(),
client_id: z.string().optional(),
}).strict();
// 查询参数解析Schema用于解析URL参数
export const authorizationQuerySchema = z.record(z.string(), z.union([z.string(), z.array(z.string())])).transform((data) => {
// 将数组参数转换为单个字符串(取第一个值)
const normalized: Record<string, string> = {};
for (const [key, value] of Object.entries(data)) {
if (Array.isArray(value)) {
// 处理数组,取第一个值,如果为空则设为空字符串
normalized[key] = value[0] || '';
} else {
// 处理字符串值
normalized[key] = value || '';
}
}
return normalized;
});
// Form data解析Schema
export const tokenFormDataSchema = z.instanceof(FormData).transform((formData) => {
const result: Record<string, string> = {};
for (const [key, value] of formData.entries()) {
// 只处理字符串值忽略File类型
if (typeof value === 'string') {
result[key] = value;
} else if (value instanceof File) {
// 如果是文件将文件名作为值通常不应该在token请求中出现
result[key] = value.name || '';
}
}
return result;
});
// HTTP Authorization header Schema
export const bearerTokenSchema = z.string().regex(/^Bearer\s+(.+)$/, '无效的Bearer令牌格式').transform((auth) => {
return auth.replace(/^Bearer\s+/, '');
});
export const basicAuthSchema = z.string().regex(/^Basic\s+(.+)$/, '无效的Basic认证格式').transform((auth) => {
try {
const base64Part = auth.replace(/^Basic\s+/, '');
if (!base64Part) {
throw new Error('Basic认证缺少凭证部分');
}
const decoded = atob(base64Part);
const colonIndex = decoded.indexOf(':');
if (colonIndex === -1) {
// 如果没有冒号,整个字符串作为用户名,密码为空
return { username: decoded, password: '' };
}
const username = decoded.substring(0, colonIndex);
const password = decoded.substring(colonIndex + 1);
return { username, password };
} catch (error) {
throw new Error(`无效的Basic认证编码: ${error instanceof Error ? error.message : '未知错误'}`);
}
});