- Published on
Modern RESTful API Best Practices and Recommendations
- Authors
- Name
- Yvan Yang
现代 RESTful API 最佳实践建议
1. 架构设计建议
1.1 分层架构
src/
├── controllers/ # 请求处理层
│ ├── base.controller.ts
│ └── user.controller.ts
├── services/ # 业务逻辑层
│ ├── base.service.ts
│ └── user.service.ts
├── repositories/ # 数据访问层
│ ├── base.repository.ts
│ └── user.repository.ts
├── domain/ # 领域模型层
│ ├── entities/
│ └── value-objects/
└── infrastructure/ # 基础设施层
├── database/
├── cache/
└── queue/
1.2 API 版本策略
推荐使用三层版本控制:
// 1. URL 主版本
/api/v1/users
// 2. Accept Header 次版本
Accept: application/vnd.api.v1.1+json
// 3. 字段版本控制
{
"data": {
"id": "123",
"name_v2": "John Doe", // 新版本字段
"name": "John Doe" // 向后兼容
}
}
1.3 错误处理策略
// errors/base.error.ts
export class AppError extends Error {
constructor(
public statusCode: number,
public code: string,
message: string,
public details?: unknown,
public operational = true
) {
super(message);
Error.captureStackTrace(this, this.constructor);
}
}
// errors/application-errors.ts
export class ValidationError extends AppError {
constructor(message: string, details?: unknown) {
super(400, 'VALIDATION_ERROR', message, details);
}
}
export class NotFoundError extends AppError {
constructor(resource: string) {
super(404, 'NOT_FOUND', `${resource} not found`);
}
}
// middleware/error-handler.ts
export function errorHandler(err: Error, req: Request, res: Response, next: NextFunction) {
if (err instanceof AppError) {
// 操作型错误 - 可预期的错误
logger.warn({
code: err.code,
message: err.message,
details: err.details,
stack: err.stack,
});
return res.status(err.statusCode).json({
error: {
code: err.code,
message: err.message,
details: err.details,
},
});
}
// 程序型错误 - 未预期的错误
logger.error({
error: err,
stack: err.stack,
});
// 通知监控系统
notifyMonitoring(err);
res.status(500).json({
error: {
code: 'INTERNAL_ERROR',
message: 'An unexpected error occurred',
},
});
}
2. 性能优化建议
2.1 多层缓存策略
// services/cache.service.ts
export class CacheService {
constructor(
private readonly redis: Redis,
private readonly memcached: Memcached
) {}
async get<T>(key: string): Promise<T | null> {
// 1. 检查本地内存缓存
const localCache = await this.memcached.get(key);
if (localCache) return localCache;
// 2. 检查分布式缓存
const redisCache = await this.redis.get(key);
if (redisCache) {
// 回填本地缓存
await this.memcached.set(key, redisCache, 60);
return redisCache;
}
return null;
}
async set(key: string, value: any, ttl: number): Promise<void> {
// 同时更新两级缓存
await Promise.all([
this.redis.set(key, value, 'EX', ttl),
this.memcached.set(key, value, 60) // 本地缓存时间短一些
]);
}
}
2.2 查询优化策略
// repositories/base.repository.ts
export class BaseRepository<T> {
async find(query: QueryOptions): Promise<T[]> {
const cacheKey = this.generateCacheKey(query);
const cached = await this.cacheService.get<T[]>(cacheKey);
if (cached) {
return cached;
}
const result = await this.executeQuery(query);
// 只缓存简单查询的结果
if (this.isCacheable(query)) {
await this.cacheService.set(cacheKey, result, 300); // 5分钟
}
return result;
}
private isCacheable(query: QueryOptions): boolean {
// 复杂查询不缓存
return !query.fullTextSearch && !query.complexJoins;
}
}
3. 安全建议
3.1 请求验证链
// middleware/validation-chain.ts
export function createValidationChain(schema: ZodSchema) {
return [
// 1. 基础清理
sanitizeRequest,
// 2. Schema 验证
validateSchema(schema),
// 3. 业务规则验证
validateBusinessRules,
// 4. 数据一致性验证
validateDataConsistency,
];
}
// Usage in routes
router.post(
'/users',
createValidationChain(createUserSchema),
userController.create
);
3.2 高级认证策略
// middleware/auth.ts
export function authenticate(config: AuthConfig = {}) {
return async (req: Request, res: Response, next: NextFunction) => {
try {
// 1. 提取凭证
const token = extractToken(req);
// 2. 验证令牌
const decoded = await verifyToken(token);
// 3. 检查令牌是否被吊销
await checkTokenRevocation(token);
// 4. 加载用户信息
const user = await loadUser(decoded.sub);
// 5. 检查用户状态
if (!isUserActive(user)) {
throw new AuthenticationError('User account is not active');
}
// 6. 检查IP限制
if (!isIpAllowed(req.ip, user.allowedIps)) {
throw new AuthenticationError('Access from this IP is not allowed');
}
// 7. 设置用户上下文
req.user = user;
// 8. 刷新令牌(如果需要)
await refreshTokenIfNeeded(req, res, token);
next();
} catch (error) {
next(new AuthenticationError('Authentication failed'));
}
};
}
4. 监控与可观测性建议
4.1 请求追踪
// middleware/tracer.ts
export function trace(req: Request, res: Response, next: NextFunction) {
const traceId = req.header('X-Trace-ID') || uuid();
const spanId = uuid();
req.context = {
traceId,
spanId,
startTime: Date.now(),
};
res.on('finish', () => {
const duration = Date.now() - req.context.startTime;
logger.info({
type: 'request',
traceId,
spanId,
method: req.method,
path: req.path,
status: res.statusCode,
duration,
userAgent: req.header('user-agent'),
ip: req.ip,
});
});
next();
}
4.2 性能指标收集
// middleware/metrics.ts
export function collectMetrics(req: Request, res: Response, next: NextFunction) {
const startTime = process.hrtime();
res.on('finish', () => {
const [seconds, nanoseconds] = process.hrtime(startTime);
const duration = seconds * 1000 + nanoseconds / 1000000;
metrics.histogram('http_request_duration_ms', duration, {
method: req.method,
path: req.route?.path || 'unknown',
status: res.statusCode.toString(),
});
metrics.increment('http_requests_total', {
method: req.method,
path: req.route?.path || 'unknown',
status: res.statusCode.toString(),
});
});
next();
}
5. 测试策略建议
5.1 分层测试
// tests/unit/services/user.service.test.ts
describe('UserService', () => {
describe('createUser', () => {
it('should hash password before saving', async () => {
const service = new UserService(mockRepo);
const result = await service.createUser({
email: 'test@example.com',
password: 'password123',
});
expect(result.password).not.toBe('password123');
expect(result.password).toMatch(/^\$2[aby]?\$\d{1,2}\$[.\/A-Za-z0-9]{53}$/);
});
});
});
// tests/integration/api/users.test.ts
describe('Users API', () => {
describe('POST /api/v1/users', () => {
it('should create user and return 201', async () => {
const response = await request(app)
.post('/api/v1/users')
.send({
email: 'test@example.com',
password: 'password123',
});
expect(response.status).toBe(201);
expect(response.body.data).toHaveProperty('id');
});
});
});
6. 部署建议
6.1 健康检查
// routes/health.ts
router.get('/health', async (req, res) => {
const checks = await Promise.all([
checkDatabase(),
checkRedis(),
checkExternalServices(),
]);
const status = checks.every(check => check.status === 'ok') ? 200 : 503;
res.status(status).json({
status: status === 200 ? 'healthy' : 'unhealthy',
timestamp: new Date().toISOString(),
checks,
version: process.env.APP_VERSION,
});
});
6.2 优雅关闭
// server.ts
const server = app.listen(port);
async function shutdown() {
console.log('Received shutdown signal');
// 1. 停止接收新请求
server.close(() => {
console.log('Server closed');
});
// 2. 等待现有请求完成
await waitForRequestsToComplete();
// 3. 关闭数据库连接
await db.end();
// 4. 关闭缓存连接
await cache.quit();
// 5. 清理其他资源
await cleanup();
process.exit(0);
}
process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);
这些建议基于实际项目经验,着重于:
可维护性:
- 清晰的代码结构
- 统一的错误处理
- 全面的日志记录
可扩展性:
- 模块化设计
- 分层架构
- 松耦合组件
性能:
- 多级缓存
- 查询优化
- 资源池化
安全性:
- 深度防御
- 多重验证
- 访问控制
可观测性:
- 分布式追踪
- 性能指标
- 健康监控
分步骤完整实现一个基于最佳实践的现代 RESTful API
第一步:项目结构和基础配置
// 项目结构
src/
├── config/ # 配置
├── controllers/ # 控制器
├── services/ # 服务层
├── repositories/ # 数据访问层
├── domain/ # 领域模型
│ ├── entities/ # 实体
│ └── interfaces/ # 接口定义
├── infrastructure/ # 基础设施
│ ├── database/ # 数据库
│ ├── cache/ # 缓存
│ └── queue/ # 消息队列
├── middleware/ # 中间件
├── utils/ # 工具函数
├── routes/ # 路由定义
└── types/ # 类型定义
// config/index.ts
import { z } from 'zod';
import dotenv from 'dotenv';
dotenv.config();
const configSchema = z.object({
app: z.object({
name: z.string().default('modern-api'),
env: z.enum(['development', 'test', 'production']).default('development'),
port: z.number().default(3000),
apiPrefix: z.string().default('/api/v1'),
}),
db: z.object({
host: z.string().default('localhost'),
port: z.number().default(5432),
username: z.string(),
password: z.string(),
database: z.string(),
maxConnections: z.number().default(20),
idleTimeout: z.number().default(30000),
}),
redis: z.object({
host: z.string().default('localhost'),
port: z.number().default(6379),
password: z.string().optional(),
db: z.number().default(0),
}),
auth: z.object({
jwtSecret: z.string(),
jwtExpiresIn: z.string().default('1h'),
refreshTokenExpiresIn: z.string().default('7d'),
}),
cors: z.object({
origins: z.array(z.string()).default(['*']),
methods: z.array(z.string()).default(['GET', 'POST', 'PUT', 'DELETE', 'PATCH']),
}),
rateLimit: z.object({
windowMs: z.number().default(15 * 60 * 1000), // 15 minutes
max: z.number().default(100), // Limit each IP to 100 requests per windowMs
}),
logging: z.object({
level: z.enum(['error', 'warn', 'info', 'debug']).default('info'),
prettify: z.boolean().default(false),
}),
monitoring: z.object({
enabled: z.boolean().default(true),
metricsPath: z.string().default('/metrics'),
}),
});
type Config = z.infer<typeof configSchema>;
function loadConfig(): Config {
try {
return configSchema.parse({
app: {
name: process.env.APP_NAME,
env: process.env.NODE_ENV,
port: process.env.PORT ? parseInt(process.env.PORT, 10) : undefined,
apiPrefix: process.env.API_PREFIX,
},
db: {
host: process.env.DB_HOST,
port: process.env.DB_PORT ? parseInt(process.env.DB_PORT, 10) : undefined,
username: process.env.DB_USERNAME,
password: process.env.DB_PASSWORD,
database: process.env.DB_DATABASE,
maxConnections: process.env.DB_MAX_CONNECTIONS ?
parseInt(process.env.DB_MAX_CONNECTIONS, 10) : undefined,
idleTimeout: process.env.DB_IDLE_TIMEOUT ?
parseInt(process.env.DB_IDLE_TIMEOUT, 10) : undefined,
},
redis: {
host: process.env.REDIS_HOST,
port: process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT, 10) : undefined,
password: process.env.REDIS_PASSWORD,
db: process.env.REDIS_DB ? parseInt(process.env.REDIS_DB, 10) : undefined,
},
auth: {
jwtSecret: process.env.JWT_SECRET!,
jwtExpiresIn: process.env.JWT_EXPIRES_IN,
refreshTokenExpiresIn: process.env.REFRESH_TOKEN_EXPIRES_IN,
},
cors: {
origins: process.env.CORS_ORIGINS?.split(','),
methods: process.env.CORS_METHODS?.split(','),
},
rateLimit: {
windowMs: process.env.RATE_LIMIT_WINDOW_MS ?
parseInt(process.env.RATE_LIMIT_WINDOW_MS, 10) : undefined,
max: process.env.RATE_LIMIT_MAX ?
parseInt(process.env.RATE_LIMIT_MAX, 10) : undefined,
},
logging: {
level: process.env.LOG_LEVEL as any,
prettify: process.env.LOG_PRETTIFY === 'true',
},
monitoring: {
enabled: process.env.MONITORING_ENABLED === 'true',
metricsPath: process.env.METRICS_PATH,
},
});
} catch (error) {
console.error('Configuration validation failed:', error);
process.exit(1);
}
}
export const config = loadConfig();
// types/global.d.ts
declare global {
namespace Express {
interface Request {
id: string;
user?: {
id: string;
role: string;
permissions: string[];
};
context: {
traceId: string;
spanId: string;
startTime: number;
};
}
}
}
// infrastructure/logger.ts
import pino from 'pino';
import { config } from '../config';
const logger = pino({
level: config.logging.level,
transport: config.logging.prettify
? {
target: 'pino-pretty',
options: {
colorize: true,
translateTime: 'HH:MM:ss Z',
ignore: 'pid,hostname',
},
}
: undefined,
base: {
env: config.app.env,
version: process.env.npm_package_version,
},
redact: {
paths: ['password', 'token', 'authorization', 'cookie'],
censor: '[REDACTED]',
},
});
export { logger };
// infrastructure/database/index.ts
import { drizzle } from 'drizzle-orm/node-postgres';
import { Pool } from 'pg';
import { config } from '../../config';
import { logger } from '../logger';
const pool = new Pool({
host: config.db.host,
port: config.db.port,
user: config.db.username,
password: config.db.password,
database: config.db.database,
max: config.db.maxConnections,
idleTimeoutMillis: config.db.idleTimeout,
application_name: config.app.name,
});
pool.on('connect', () => {
logger.debug('New database connection established');
});
pool.on('error', (err) => {
logger.error('Unexpected error on idle database client', err);
process.exit(-1);
});
export const db = drizzle(pool);
// infrastructure/cache/index.ts
import { Redis } from 'ioredis';
import { config } from '../../config';
import { logger } from '../logger';
const redis = new Redis({
host: config.redis.host,
port: config.redis.port,
password: config.redis.password,
db: config.redis.db,
retryStrategy(times) {
const delay = Math.min(times * 50, 2000);
return delay;
},
reconnectOnError(err) {
const targetError = 'READONLY';
if (err.message.includes(targetError)) {
return true;
}
return false;
},
});
redis.on('connect', () => {
logger.debug('Redis client connected');
});
redis.on('error', (err) => {
logger.error('Redis client error', err);
});
export { redis };
这是第一步的实现,包含:
项目结构设计:
- 采用分层架构
- 清晰的职责划分
- 模块化组织
配置管理:
- 类型安全的配置验证
- 环境变量支持
- 合理的默认值
基础设施搭建:
- 数据库连接池
- Redis 客户端
- 日志系统
类型定义:
- Express 类型扩展
- 全局类型声明
第二步:错误处理系统和基础中间件
// src/types/error.ts
export interface ErrorResponse {
error: {
code: string;
message: string;
details?: unknown;
};
meta: {
timestamp: string;
request_id: string;
path: string;
};
}
// src/errors/base.error.ts
export class AppError extends Error {
constructor(
public readonly statusCode: number,
public readonly code: string,
message: string,
public readonly details?: unknown,
public readonly isOperational = true
) {
super(message);
Error.captureStackTrace(this, this.constructor);
}
}
// src/errors/application-errors.ts
export class ValidationError extends AppError {
constructor(message: string, details?: unknown) {
super(400, 'VALIDATION_ERROR', message, details);
}
}
export class AuthenticationError extends AppError {
constructor(message = 'Authentication failed') {
super(401, 'AUTHENTICATION_ERROR', message);
}
}
export class AuthorizationError extends AppError {
constructor(message = 'Permission denied') {
super(403, 'AUTHORIZATION_ERROR', message);
}
}
export class NotFoundError extends AppError {
constructor(resource: string) {
super(404, 'NOT_FOUND', `${resource} not found`);
}
}
export class ConflictError extends AppError {
constructor(message: string) {
super(409, 'CONFLICT', message);
}
}
export class RateLimitError extends AppError {
constructor(message = 'Too many requests') {
super(429, 'RATE_LIMIT_EXCEEDED', message);
}
}
// src/middleware/error.ts
import { Request, Response, NextFunction } from 'express';
import { AppError } from '../errors/base.error';
import { logger } from '../infrastructure/logger';
import { config } from '../config';
import { ErrorResponse } from '../types/error';
import { ZodError } from 'zod';
export function errorHandler(
err: Error,
req: Request,
res: Response<ErrorResponse>,
next: NextFunction
): void {
logger.error({
err,
req: {
method: req.method,
url: req.url,
params: req.params,
query: req.query,
body: req.body,
},
});
if (err instanceof AppError) {
res.status(err.statusCode).json({
error: {
code: err.code,
message: err.message,
details: err.details,
},
meta: {
timestamp: new Date().toISOString(),
request_id: req.id,
path: req.path,
},
});
return;
}
if (err instanceof ZodError) {
res.status(400).json({
error: {
code: 'VALIDATION_ERROR',
message: 'Invalid request data',
details: err.errors,
},
meta: {
timestamp: new Date().toISOString(),
request_id: req.id,
path: req.path,
},
});
return;
}
// Handle unexpected errors
const internalError = {
error: {
code: 'INTERNAL_ERROR',
message: config.app.env === 'production'
? 'An unexpected error occurred'
: err.message,
details: config.app.env === 'production' ? undefined : err.stack,
},
meta: {
timestamp: new Date().toISOString(),
request_id: req.id,
path: req.path,
},
};
res.status(500).json(internalError);
}
// src/middleware/request-id.ts
import { Request, Response, NextFunction } from 'express';
import { v4 as uuidv4 } from 'uuid';
export function requestId(req: Request, res: Response, next: NextFunction): void {
const existingRequestId = req.headers['x-request-id'];
req.id = (existingRequestId as string) || uuidv4();
res.setHeader('X-Request-ID', req.id);
next();
}
// src/middleware/request-context.ts
import { Request, Response, NextFunction } from 'express';
import { v4 as uuidv4 } from 'uuid';
import cls from 'cls-hooked';
const nsid = 'request-context';
export const ns = cls.createNamespace(nsid);
export function requestContext(req: Request, res: Response, next: NextFunction): void {
ns.run(() => {
ns.set('traceId', req.id);
ns.set('spanId', uuidv4());
ns.set('startTime', Date.now());
next();
});
}
// src/middleware/request-logger.ts
import { Request, Response, NextFunction } from 'express';
import { logger } from '../infrastructure/logger';
export function requestLogger(req: Request, res: Response, next: NextFunction): void {
const startTime = Date.now();
// Log request
logger.info({
type: 'request',
method: req.method,
url: req.url,
requestId: req.id,
userAgent: req.get('user-agent'),
ip: req.ip,
});
// Log response
res.on('finish', () => {
const duration = Date.now() - startTime;
logger.info({
type: 'response',
method: req.method,
url: req.url,
status: res.statusCode,
duration,
requestId: req.id,
});
});
next();
}
// src/middleware/cors.ts
import cors from 'cors';
import { config } from '../config';
export const corsMiddleware = cors({
origin: (origin, callback) => {
if (!origin || config.cors.origins.includes('*')) {
callback(null, true);
return;
}
if (config.cors.origins.includes(origin)) {
callback(null, true);
} else {
callback(new Error('Not allowed by CORS'));
}
},
methods: config.cors.methods,
credentials: true,
maxAge: 86400, // 24 hours
exposedHeaders: ['X-Request-ID', 'X-RateLimit-Limit', 'X-RateLimit-Remaining'],
allowedHeaders: ['Content-Type', 'Authorization', 'X-Request-ID'],
});
// src/middleware/security.ts
import helmet from 'helmet';
import { rateLimit } from 'express-rate-limit';
import { config } from '../config';
import { RateLimitError } from '../errors/application-errors';
export const securityMiddleware = {
helmet: helmet({
contentSecurityPolicy: {
directives: {
defaultSrc: ["'self'"],
styleSrc: ["'self'", "'unsafe-inline'"],
scriptSrc: ["'self'"],
imgSrc: ["'self'", "data:", "https:"],
},
},
crossOriginEmbedderPolicy: true,
crossOriginOpenerPolicy: true,
crossOriginResourcePolicy: true,
dnsPrefetchControl: true,
frameguard: true,
hidePoweredBy: true,
hsts: true,
ieNoOpen: true,
noSniff: true,
referrerPolicy: true,
xssFilter: true,
}),
rateLimit: rateLimit({
windowMs: config.rateLimit.windowMs,
max: config.rateLimit.max,
standardHeaders: true,
legacyHeaders: false,
handler: (req, res, next) => {
next(new RateLimitError());
},
keyGenerator: (req) => {
return req.header('X-Forwarded-For') || req.ip;
},
}),
};
// src/middleware/validation.ts
import { Request, Response, NextFunction } from 'express';
import { ZodSchema } from 'zod';
import { ValidationError } from '../errors/application-errors';
export function validateRequest(schema: ZodSchema) {
return async (req: Request, res: Response, next: NextFunction): Promise<void> => {
try {
await schema.parseAsync({
body: req.body,
query: req.query,
params: req.params,
});
next();
} catch (error) {
if (error instanceof Error) {
next(new ValidationError('Validation failed', error));
} else {
next(error);
}
}
};
}
// src/middleware/async-handler.ts
import { Request, Response, NextFunction } from 'express';
type AsyncRequestHandler = (
req: Request,
res: Response,
next: NextFunction
) => Promise<unknown>;
export function asyncHandler(handler: AsyncRequestHandler) {
return async (req: Request, res: Response, next: NextFunction): Promise<void> => {
try {
await handler(req, res, next);
} catch (error) {
next(error);
}
};
}
// src/middleware/index.ts
export {
errorHandler,
requestId,
requestContext,
requestLogger,
corsMiddleware,
securityMiddleware,
validateRequest,
asyncHandler,
};
这一步实现了:
错误处理系统:
- 基础错误类
- 特定业务错误类
- 统一错误处理中间件
- 错误响应格式化
核心中间件:
- 请求 ID 生成
- 请求上下文管理
- 请求日志记录
- CORS 配置
- 安全头设置
- 速率限制
- 请求验证
- 异步处理包装器
安全特性:
- Helmet 安全头
- 速率限制
- CORS 策略
- XSS 防护
- CSRF 防护
日志和追踪:
- 请求-响应日志
- 性能追踪
- 错误详情记录
这些组件为 API 提供了:
- 安全防护
- 错误处理
- 性能监控
- 请求追踪
- 输入验证
第三步:实现领域模型、数据模式和数据访问层。
// src/domain/interfaces/repository.ts
export interface BaseRepository<T, K> {
findById(id: K): Promise<T | null>;
findAll(options?: QueryOptions): Promise<T[]>;
create(data: Omit<T, 'id' | 'createdAt' | 'updatedAt'>): Promise<T>;
update(id: K, data: Partial<T>): Promise<T>;
delete(id: K): Promise<boolean>;
count(filter?: FilterOptions): Promise<number>;
}
export interface QueryOptions {
page?: number;
limit?: number;
sort?: SortOptions;
filter?: FilterOptions;
fields?: string[];
includes?: string[];
}
export interface SortOptions {
field: string;
order: 'asc' | 'desc';
}
export interface FilterOptions {
[key: string]: any;
}
// src/domain/entities/user.entity.ts
export interface User {
id: string;
email: string;
username: string;
password: string;
role: UserRole;
status: UserStatus;
lastLoginAt?: Date;
createdAt: Date;
updatedAt: Date;
}
export enum UserRole {
ADMIN = 'admin',
USER = 'user',
}
export enum UserStatus {
ACTIVE = 'active',
INACTIVE = 'inactive',
SUSPENDED = 'suspended',
}
// src/domain/entities/audit.entity.ts
export interface AuditLog {
id: string;
userId: string;
action: string;
resource: string;
resourceId: string;
oldValue?: unknown;
newValue?: unknown;
metadata?: Record<string, unknown>;
createdAt: Date;
}
// src/db/schema/index.ts
import { pgTable, uuid, varchar, timestamp, text, jsonb } from 'drizzle-orm/pg-core';
import { createInsertSchema, createSelectSchema } from 'drizzle-zod';
import { UserRole, UserStatus } from '../../domain/entities/user.entity';
export const users = pgTable('users', {
id: uuid('id').defaultRandom().primaryKey(),
email: varchar('email', { length: 255 }).notNull().unique(),
username: varchar('username', { length: 50 }).notNull(),
password: text('password').notNull(),
role: varchar('role', { length: 20 }).$type<UserRole>().notNull().default(UserRole.USER),
status: varchar('status', { length: 20 }).$type<UserStatus>().notNull().default(UserStatus.ACTIVE),
lastLoginAt: timestamp('last_login_at', { withTimezone: true }),
createdAt: timestamp('created_at', { withTimezone: true }).notNull().defaultNow(),
updatedAt: timestamp('updated_at', { withTimezone: true }).notNull().defaultNow(),
});
export const auditLogs = pgTable('audit_logs', {
id: uuid('id').defaultRandom().primaryKey(),
userId: uuid('user_id').notNull().references(() => users.id),
action: varchar('action', { length: 50 }).notNull(),
resource: varchar('resource', { length: 50 }).notNull(),
resourceId: varchar('resource_id', { length: 50 }).notNull(),
oldValue: jsonb('old_value'),
newValue: jsonb('new_value'),
metadata: jsonb('metadata'),
createdAt: timestamp('created_at', { withTimezone: true }).notNull().defaultNow(),
});
export const refreshTokens = pgTable('refresh_tokens', {
id: uuid('id').defaultRandom().primaryKey(),
userId: uuid('user_id').notNull().references(() => users.id),
token: text('token').notNull(),
expiresAt: timestamp('expires_at', { withTimezone: true }).notNull(),
createdAt: timestamp('created_at', { withTimezone: true }).notNull().defaultNow(),
});
// Zod schemas for validation
export const insertUserSchema = createInsertSchema(users, {
email: (schema) => schema.email.email(),
username: (schema) => schema.username.min(3).max(50),
password: (schema) => schema.password.min(8).max(100),
role: (schema) => schema.role.optional(),
});
export const updateUserSchema = createInsertSchema(users, {
email: (schema) => schema.email.email().optional(),
username: (schema) => schema.username.min(3).max(50).optional(),
password: (schema) => schema.password.min(8).max(100).optional(),
role: (schema) => schema.role.optional(),
status: (schema) => schema.status.optional(),
}).pick({ email: true, username: true, status: true });
// src/repositories/base.repository.ts
import { eq, and, or, sql } from 'drizzle-orm';
import { PostgresJsDatabase } from 'drizzle-orm/postgres-js';
import { BaseRepository, QueryOptions, FilterOptions } from '../domain/interfaces/repository';
export abstract class Repository<T, K> implements BaseRepository<T, K> {
constructor(
protected readonly db: PostgresJsDatabase,
protected readonly table: any
) {}
protected abstract mapToEntity(record: any): T;
async findById(id: K): Promise<T | null> {
const result = await this.db.select().from(this.table).where(eq(this.table.id, id));
return result[0] ? this.mapToEntity(result[0]) : null;
}
async findAll(options: QueryOptions = {}): Promise<T[]> {
const { page = 1, limit = 10, sort, filter, fields } = options;
const offset = (page - 1) * limit;
let query = this.db.select().from(this.table);
// Apply filters
if (filter) {
const conditions = Object.entries(filter).map(([key, value]) => {
if (Array.isArray(value)) {
return or(...value.map(v => eq(this.table[key], v)));
}
return eq(this.table[key], value);
});
query = query.where(and(...conditions));
}
// Apply sorting
if (sort) {
const { field, order } = sort;
query = query.orderBy(order === 'desc' ? sql`${this.table[field]} DESC` : this.table[field]);
}
// Apply pagination
query = query.limit(limit).offset(offset);
const results = await query;
return results.map(this.mapToEntity);
}
async create(data: Omit<T, 'id' | 'createdAt' | 'updatedAt'>): Promise<T> {
const result = await this.db.insert(this.table).values(data).returning();
return this.mapToEntity(result[0]);
}
async update(id: K, data: Partial<T>): Promise<T> {
const result = await this.db
.update(this.table)
.set({ ...data, updatedAt: new Date() })
.where(eq(this.table.id, id))
.returning();
return this.mapToEntity(result[0]);
}
async delete(id: K): Promise<boolean> {
const result = await this.db
.delete(this.table)
.where(eq(this.table.id, id))
.returning();
return result.length > 0;
}
async count(filter?: FilterOptions): Promise<number> {
let query = this.db.select({ count: sql<number>`count(*)` }).from(this.table);
if (filter) {
const conditions = Object.entries(filter).map(([key, value]) => {
if (Array.isArray(value)) {
return or(...value.map(v => eq(this.table[key], v)));
}
return eq(this.table[key], value);
});
query = query.where(and(...conditions));
}
const result = await query;
return Number(result[0].count);
}
}
// src/repositories/user.repository.ts
import { User } from '../domain/entities/user.entity';
import { users } from '../db/schema';
import { Repository } from './base.repository';
import { NotFoundError } from '../errors/application-errors';
import { db } from '../infrastructure/database';
export class UserRepository extends Repository<User, string> {
constructor() {
super(db, users);
}
protected mapToEntity(record: typeof users.$inferSelect): User {
return {
id: record.id,
email: record.email,
username: record.username,
password: record.password,
role: record.role,
status: record.status,
lastLoginAt: record.lastLoginAt || undefined,
createdAt: record.createdAt,
updatedAt: record.updatedAt,
};
}
async findByEmail(email: string): Promise<User | null> {
const result = await this.db
.select()
.from(users)
.where(eq(users.email, email));
return result[0] ? this.mapToEntity(result[0]) : null;
}
async findActive(): Promise<User[]> {
const results = await this.db
.select()
.from(users)
.where(eq(users.status, 'active'));
return results.map(this.mapToEntity);
}
async updateLastLogin(id: string): Promise<void> {
await this.db
.update(users)
.set({ lastLoginAt: new Date() })
.where(eq(users.id, id));
}
}
// src/repositories/audit.repository.ts
import { AuditLog } from '../domain/entities/audit.entity';
import { auditLogs } from '../db/schema';
import { Repository } from './base.repository';
import { db } from '../infrastructure/database';
export class AuditRepository extends Repository<AuditLog, string> {
constructor() {
super(db, auditLogs);
}
protected mapToEntity(record: typeof auditLogs.$inferSelect): AuditLog {
return {
id: record.id,
userId: record.userId,
action: record.action,
resource: record.resource,
resourceId: record.resourceId,
oldValue: record.oldValue,
newValue: record.newValue,
metadata: record.metadata,
createdAt: record.createdAt,
};
}
async logChange(
userId: string,
action: string,
resource: string,
resourceId: string,
oldValue?: unknown,
newValue?: unknown,
metadata?: Record<string, unknown>
): Promise<AuditLog> {
return this.create({
userId,
action,
resource,
resourceId,
oldValue,
newValue,
metadata,
});
}
}
// src/repositories/index.ts
export { UserRepository } from './user.repository';
export { AuditRepository } from './audit.repository';
这一步实现了:
领域模型:
- 用户实体
- 审计日志实体
- 基础接口定义
数据库模式:
- 用户表
- 审计日志表
- 刷新令牌表
- Schema 验证
数据访问层:
- 通用仓储基类
- 用户仓储实现
- 审计日志仓储实现
- 类型安全查询
主要特点:
类型安全:
- 完整的 TypeScript 类型
- Zod schema 验证
- Drizzle ORM 类型推断
数据完整性:
- 外键约束
- 唯一性约束
- 默认值设置
查询能力:
- 分页
- 排序
- 过滤
- 字段选择
审计能力:
- 变更追踪
- 操作日志
- 数据历史
第四步:实现认证和授权系统
// src/types/auth.ts
export interface TokenPayload {
sub: string; // user id
role: string;
permissions: string[];
type: 'access' | 'refresh';
jti: string; // token id
}
export interface LoginResult {
accessToken: string;
refreshToken: string;
user: {
id: string;
email: string;
username: string;
role: string;
};
}
// src/domain/services/token.service.ts
import jwt from 'jsonwebtoken';
import { Redis } from 'ioredis';
import { v4 as uuidv4 } from 'uuid';
import { config } from '../../config';
import { TokenPayload } from '../../types/auth';
import { User } from '../entities/user.entity';
export class TokenService {
constructor(private readonly redis: Redis) {}
async generateTokens(user: User): Promise<{ accessToken: string; refreshToken: string }> {
const accessTokenId = uuidv4();
const refreshTokenId = uuidv4();
const accessToken = await this.generateAccessToken(user, accessTokenId);
const refreshToken = await this.generateRefreshToken(user, refreshTokenId);
// Store token IDs in Redis for blacklisting
await this.redis.setex(
`token:access:${accessTokenId}`,
config.auth.accessTokenTTL,
user.id
);
await this.redis.setex(
`token:refresh:${refreshTokenId}`,
config.auth.refreshTokenTTL,
user.id
);
return { accessToken, refreshToken };
}
private async generateAccessToken(user: User, tokenId: string): Promise<string> {
const payload: TokenPayload = {
sub: user.id,
role: user.role,
permissions: await this.getUserPermissions(user),
type: 'access',
jti: tokenId,
};
return jwt.sign(payload, config.auth.jwtSecret, {
expiresIn: config.auth.jwtExpiresIn,
algorithm: 'HS256',
});
}
private async generateRefreshToken(user: User, tokenId: string): Promise<string> {
const payload: TokenPayload = {
sub: user.id,
role: user.role,
permissions: [],
type: 'refresh',
jti: tokenId,
};
return jwt.sign(payload, config.auth.jwtSecret, {
expiresIn: config.auth.refreshTokenExpiresIn,
algorithm: 'HS256',
});
}
async verifyToken(token: string): Promise<TokenPayload> {
const decoded = jwt.verify(token, config.auth.jwtSecret) as TokenPayload;
// Check if token is blacklisted
const isBlacklisted = await this.redis.exists(`token:blacklist:${decoded.jti}`);
if (isBlacklisted) {
throw new Error('Token has been revoked');
}
return decoded;
}
async revokeToken(tokenId: string): Promise<void> {
// Add token to blacklist
await this.redis.setex(
`token:blacklist:${tokenId}`,
config.auth.jwtExpiresIn,
'1'
);
}
async revokeAllUserTokens(userId: string): Promise<void> {
const pattern = `token:*:${userId}`;
const keys = await this.redis.keys(pattern);
if (keys.length > 0) {
await this.redis.del(...keys);
}
}
private async getUserPermissions(user: User): Promise<string[]> {
// 这里可以从数据库或配置中加载用户权限
const rolePermissions: Record<string, string[]> = {
admin: ['*'],
user: ['read:own', 'write:own'],
};
return rolePermissions[user.role] || [];
}
}
// src/services/auth.service.ts
import bcrypt from 'bcryptjs';
import { UserRepository } from '../repositories/user.repository';
import { TokenService } from '../domain/services/token.service';
import { LoginResult } from '../types/auth';
import { User, UserStatus } from '../domain/entities/user.entity';
import {
AuthenticationError,
ValidationError
} from '../errors/application-errors';
export class AuthService {
constructor(
private readonly userRepository: UserRepository,
private readonly tokenService: TokenService
) {}
async login(email: string, password: string): Promise<LoginResult> {
// Find user
const user = await this.userRepository.findByEmail(email);
if (!user) {
throw new AuthenticationError('Invalid credentials');
}
// Check user status
if (user.status !== UserStatus.ACTIVE) {
throw new AuthenticationError('Account is not active');
}
// Verify password
const isPasswordValid = await bcrypt.compare(password, user.password);
if (!isPasswordValid) {
throw new AuthenticationError('Invalid credentials');
}
// Generate tokens
const { accessToken, refreshToken } = await this.tokenService.generateTokens(user);
// Update last login
await this.userRepository.updateLastLogin(user.id);
return {
accessToken,
refreshToken,
user: {
id: user.id,
email: user.email,
username: user.username,
role: user.role,
},
};
}
async register(userData: {
email: string;
username: string;
password: string;
}): Promise<User> {
// Check if email already exists
const existingUser = await this.userRepository.findByEmail(userData.email);
if (existingUser) {
throw new ValidationError('Email already exists');
}
// Hash password
const hashedPassword = await bcrypt.hash(userData.password, 10);
// Create user
const user = await this.userRepository.create({
...userData,
password: hashedPassword,
role: 'user',
status: UserStatus.ACTIVE,
});
return user;
}
async refreshToken(refreshToken: string): Promise<{ accessToken: string }> {
// Verify refresh token
const payload = await this.tokenService.verifyToken(refreshToken);
if (payload.type !== 'refresh') {
throw new AuthenticationError('Invalid token type');
}
// Get user
const user = await this.userRepository.findById(payload.sub);
if (!user || user.status !== UserStatus.ACTIVE) {
throw new AuthenticationError('User not found or inactive');
}
// Generate new access token
const { accessToken } = await this.tokenService.generateTokens(user);
return { accessToken };
}
async logout(userId: string, tokenId: string): Promise<void> {
await this.tokenService.revokeToken(tokenId);
}
async logoutAll(userId: string): Promise<void> {
await this.tokenService.revokeAllUserTokens(userId);
}
}
// src/middleware/auth.ts
import { Request, Response, NextFunction } from 'express';
import { TokenService } from '../domain/services/token.service';
import { AuthenticationError, AuthorizationError } from '../errors/application-errors';
export function authenticate(tokenService: TokenService) {
return async (req: Request, res: Response, next: NextFunction): Promise<void> => {
try {
const authHeader = req.headers.authorization;
if (!authHeader?.startsWith('Bearer ')) {
throw new AuthenticationError('No token provided');
}
const token = authHeader.split(' ')[1];
const payload = await tokenService.verifyToken(token);
req.user = {
id: payload.sub,
role: payload.role,
permissions: payload.permissions,
};
next();
} catch (error) {
next(new AuthenticationError('Invalid token'));
}
};
}
export function authorize(permissions: string | string[]) {
return (req: Request, res: Response, next: NextFunction): void => {
if (!req.user) {
throw new AuthenticationError('User not authenticated');
}
const requiredPermissions = Array.isArray(permissions) ? permissions : [permissions];
// Check if user has admin permissions
if (req.user.permissions.includes('*')) {
return next();
}
// Check specific permissions
const hasPermission = requiredPermissions.every(permission =>
req.user!.permissions.includes(permission)
);
if (!hasPermission) {
throw new AuthorizationError('Insufficient permissions');
}
next();
};
}
// src/middleware/ownership.ts
export function checkOwnership(resourceKey: string = 'id') {
return (req: Request, res: Response, next: NextFunction): void => {
if (!req.user) {
throw new AuthenticationError('User not authenticated');
}
const resourceId = req.params[resourceKey];
// Admins can access all resources
if (req.user.role === 'admin') {
return next();
}
// Users can only access their own resources
if (resourceId !== req.user.id) {
throw new AuthorizationError('Access denied');
}
next();
};
}
我们还需要添加测试:
// src/__tests__/services/auth.service.test.ts
import { AuthService } from '../../services/auth.service';
import { TokenService } from '../../domain/services/token.service';
import { UserRepository } from '../../repositories/user.repository';
import { AuthenticationError } from '../../errors/application-errors';
import { UserStatus } from '../../domain/entities/user.entity';
describe('AuthService', () => {
let authService: AuthService;
let userRepository: jest.Mocked<UserRepository>;
let tokenService: jest.Mocked<TokenService>;
beforeEach(() => {
userRepository = {
findByEmail: jest.fn(),
updateLastLogin: jest.fn(),
} as any;
tokenService = {
generateTokens: jest.fn(),
verifyToken: jest.fn(),
} as any;
authService = new AuthService(userRepository, tokenService);
});
describe('login', () => {
it('should authenticate valid credentials', async () => {
const mockUser = {
id: '1',
email: 'test@example.com',
password: await bcrypt.hash('password123', 10),
status: UserStatus.ACTIVE,
};
userRepository.findByEmail.mockResolvedValue(mockUser);
tokenService.generateTokens.mockResolvedValue({
accessToken: 'access-token',
refreshToken: 'refresh-token',
});
const result = await authService.login('test@example.com', 'password123');
expect(result).toHaveProperty('accessToken', 'access-token');
expect(result).toHaveProperty('refreshToken', 'refresh-token');
expect(userRepository.updateLastLogin).toHaveBeenCalledWith(mockUser.id);
});
it('should reject invalid credentials', async () => {
userRepository.findByEmail.mockResolvedValue(null);
await expect(
authService.login('test@example.com', 'wrong-password')
).rejects.toThrow(AuthenticationError);
});
});
});
// src/__tests__/middleware/auth.test.ts
import { authenticate, authorize } from '../../middleware/auth';
import { TokenService } from '../../domain/services/token.service';
import { AuthenticationError, AuthorizationError } from '../../errors/application-errors';
describe('Auth Middleware', () => {
let tokenService: jest.Mocked<TokenService>;
let req: any;
let res: any;
let next: jest.Mock;
beforeEach(() => {
tokenService = {
verifyToken: jest.fn(),
} as any;
req = {
headers: {},
};
res = {};
next = jest.fn();
});
describe('authenticate', () => {
it('should authenticate valid token', async () => {
const mockPayload = {
sub: '1',
role: 'user',
permissions: ['read:own'],
};
req.headers.authorization = 'Bearer valid-token';
tokenService.verifyToken.mockResolvedValue(mockPayload);
await authenticate(tokenService)(req, res, next);
expect(req.user).toEqual(mockPayload);
expect(next).toHaveBeenCalled();
});
it('should reject missing token', async () => {
await authenticate(tokenService)(req, res, next);
expect(next).toHaveBeenCalledWith(expect.any(AuthenticationError));
});
});
});
这一步实现了:
令牌管理:
- 访问令牌和刷新令牌生成
- 令牌验证和撤销
- Redis 黑名单管理
认证服务:
- 用户登录
- 用户注册
- 令牌刷新
- 登出功能
认证中间件:
- JWT 验证
- 权限检查
- 资源所有权验证
安全特性:
- 密码哈希
- 令牌黑名单
- 用户状态检查
- 权限系统
测试覆盖:
- 服务层测试
- 中间件测试
- 模拟依赖
第五步:实现业务服务层和事件系统。
// src/events/event-emitter.ts
import { EventEmitter } from 'events';
import { logger } from '../infrastructure/logger';
export type EventHandler<T = any> = (data: T) => Promise<void> | void;
class AppEventEmitter extends EventEmitter {
constructor() {
super();
this.setMaxListeners(20);
}
async emit(eventName: string, data: any): Promise<boolean> {
logger.debug({ event: eventName, data }, 'Event emitted');
return super.emit(eventName, data);
}
}
export const eventEmitter = new AppEventEmitter();
// src/events/event-types.ts
export enum EventTypes {
USER_CREATED = 'user.created',
USER_UPDATED = 'user.updated',
USER_DELETED = 'user.deleted',
USER_LOGIN = 'user.login',
USER_LOGOUT = 'user.logout',
USER_PASSWORD_CHANGED = 'user.password_changed',
USER_EMAIL_CHANGED = 'user.email_changed',
}
// src/services/base.service.ts
import { BaseRepository, QueryOptions } from '../domain/interfaces/repository';
import { NotFoundError } from '../errors/application-errors';
import { eventEmitter } from '../events/event-emitter';
import { AuditRepository } from '../repositories/audit.repository';
import { CacheService } from '../infrastructure/cache/cache.service';
export abstract class BaseService<T, K> {
constructor(
protected readonly repository: BaseRepository<T, K>,
protected readonly cacheService: CacheService,
protected readonly auditRepository: AuditRepository,
protected readonly entityName: string
) {}
async findById(id: K): Promise<T> {
const cacheKey = `${this.entityName}:${id}`;
// Try cache first
const cached = await this.cacheService.get<T>(cacheKey);
if (cached) {
return cached;
}
const entity = await this.repository.findById(id);
if (!entity) {
throw new NotFoundError(this.entityName);
}
// Cache the result
await this.cacheService.set(cacheKey, entity);
return entity;
}
async findAll(options?: QueryOptions): Promise<{ items: T[]; total: number }> {
const cacheKey = `${this.entityName}:list:${JSON.stringify(options)}`;
// Try cache first
const cached = await this.cacheService.get<{ items: T[]; total: number }>(cacheKey);
if (cached) {
return cached;
}
const [items, total] = await Promise.all([
this.repository.findAll(options),
this.repository.count(options?.filter),
]);
const result = { items, total };
// Cache the result
await this.cacheService.set(cacheKey, result);
return result;
}
async create(data: Partial<T>, userId: string): Promise<T> {
const entity = await this.repository.create(data);
// Audit log
await this.auditRepository.logChange(
userId,
'CREATE',
this.entityName,
(entity as any).id,
null,
entity
);
// Clear relevant caches
await this.clearListCache();
// Emit event
await eventEmitter.emit(`${this.entityName}.created`, entity);
return entity;
}
async update(id: K, data: Partial<T>, userId: string): Promise<T> {
const oldEntity = await this.repository.findById(id);
if (!oldEntity) {
throw new NotFoundError(this.entityName);
}
const updatedEntity = await this.repository.update(id, data);
// Audit log
await this.auditRepository.logChange(
userId,
'UPDATE',
this.entityName,
id as string,
oldEntity,
updatedEntity
);
// Clear caches
await Promise.all([
this.clearEntityCache(id),
this.clearListCache(),
]);
// Emit event
await eventEmitter.emit(`${this.entityName}.updated`, {
old: oldEntity,
new: updatedEntity,
});
return updatedEntity;
}
async delete(id: K, userId: string): Promise<void> {
const entity = await this.repository.findById(id);
if (!entity) {
throw new NotFoundError(this.entityName);
}
await this.repository.delete(id);
// Audit log
await this.auditRepository.logChange(
userId,
'DELETE',
this.entityName,
id as string,
entity,
null
);
// Clear caches
await Promise.all([
this.clearEntityCache(id),
this.clearListCache(),
]);
// Emit event
await eventEmitter.emit(`${this.entityName}.deleted`, entity);
}
protected async clearEntityCache(id: K): Promise<void> {
await this.cacheService.delete(`${this.entityName}:${id}`);
}
protected async clearListCache(): Promise<void> {
const pattern = `${this.entityName}:list:*`;
await this.cacheService.deletePattern(pattern);
}
}
// src/services/user.service.ts
import { User, UserStatus } from '../domain/entities/user.entity';
import { UserRepository } from '../repositories/user.repository';
import { BaseService } from './base.service';
import { ValidationError } from '../errors/application-errors';
import { EventTypes } from '../events/event-types';
import bcrypt from 'bcryptjs';
export class UserService extends BaseService<User, string> {
constructor(
private readonly userRepository: UserRepository,
cacheService: CacheService,
auditRepository: AuditRepository
) {
super(userRepository, cacheService, auditRepository, 'user');
}
async changePassword(
userId: string,
oldPassword: string,
newPassword: string
): Promise<void> {
const user = await this.userRepository.findById(userId);
if (!user) {
throw new NotFoundError('User');
}
const isPasswordValid = await bcrypt.compare(oldPassword, user.password);
if (!isPasswordValid) {
throw new ValidationError('Current password is incorrect');
}
const hashedPassword = await bcrypt.hash(newPassword, 10);
await this.update(userId, { password: hashedPassword }, userId);
await eventEmitter.emit(EventTypes.USER_PASSWORD_CHANGED, {
userId,
timestamp: new Date(),
});
}
async changeEmail(
userId: string,
newEmail: string,
password: string
): Promise<void> {
const user = await this.userRepository.findById(userId);
if (!user) {
throw new NotFoundError('User');
}
const isPasswordValid = await bcrypt.compare(password, user.password);
if (!isPasswordValid) {
throw new ValidationError('Password is incorrect');
}
const emailExists = await this.userRepository.findByEmail(newEmail);
if (emailExists) {
throw new ValidationError('Email already exists');
}
await this.update(userId, { email: newEmail }, userId);
await eventEmitter.emit(EventTypes.USER_EMAIL_CHANGED, {
userId,
oldEmail: user.email,
newEmail,
timestamp: new Date(),
});
}
async changeStatus(
userId: string,
status: UserStatus,
adminId: string
): Promise<void> {
const user = await this.userRepository.findById(userId);
if (!user) {
throw new NotFoundError('User');
}
await this.update(userId, { status }, adminId);
await eventEmitter.emit(`user.status_changed`, {
userId,
oldStatus: user.status,
newStatus: status,
changedBy: adminId,
timestamp: new Date(),
});
}
}
// src/events/handlers/user.handler.ts
import { EventTypes } from '../event-types';
import { eventEmitter } from '../event-emitter';
import { logger } from '../../infrastructure/logger';
import { EmailService } from '../../infrastructure/email/email.service';
import { NotificationService } from '../../infrastructure/notification/notification.service';
export class UserEventHandler {
constructor(
private readonly emailService: EmailService,
private readonly notificationService: NotificationService
) {
this.registerHandlers();
}
private registerHandlers(): void {
eventEmitter.on(EventTypes.USER_CREATED, this.handleUserCreated.bind(this));
eventEmitter.on(EventTypes.USER_EMAIL_CHANGED, this.handleEmailChanged.bind(this));
eventEmitter.on(EventTypes.USER_PASSWORD_CHANGED, this.handlePasswordChanged.bind(this));
eventEmitter.on(EventTypes.USER_LOGIN, this.handleUserLogin.bind(this));
}
private async handleUserCreated(data: { user: User }): Promise<void> {
try {
await this.emailService.sendWelcomeEmail(data.user.email);
logger.info({ userId: data.user.id }, 'Welcome email sent to new user');
} catch (error) {
logger.error({ error, userId: data.user.id }, 'Failed to send welcome email');
}
}
private async handleEmailChanged(data: {
userId: string;
oldEmail: string;
newEmail: string;
}): Promise<void> {
try {
await Promise.all([
this.emailService.sendEmailChangeConfirmation(data.oldEmail),
this.emailService.sendEmailChangeNotification(data.newEmail),
]);
logger.info({ userId: data.userId }, 'Email change notifications sent');
} catch (error) {
logger.error(
{ error, userId: data.userId },
'Failed to send email change notifications'
);
}
}
private async handlePasswordChanged(data: { userId: string }): Promise<void> {
try {
await this.notificationService.sendSecurityAlert(
data.userId,
'Your password has been changed'
);
logger.info({ userId: data.userId }, 'Password change notification sent');
} catch (error) {
logger.error(
{ error, userId: data.userId },
'Failed to send password change notification'
);
}
}
private async handleUserLogin(data: {
userId: string;
ip: string;
userAgent: string;
}): Promise<void> {
try {
await this.notificationService.sendLoginAlert(
data.userId,
{
ip: data.ip,
userAgent: data.userAgent,
timestamp: new Date(),
}
);
logger.info({ userId: data.userId }, 'Login notification sent');
} catch (error) {
logger.error(
{ error, userId: data.userId },
'Failed to send login notification'
);
}
}
}
现在让我们添加服务注册和依赖注入的配置:
// src/container.ts
import { Container } from 'typedi';
import { UserRepository } from './repositories/user.repository';
import { AuditRepository } from './repositories/audit.repository';
import { UserService } from './services/user.service';
import { AuthService } from './services/auth.service';
import { TokenService } from './domain/services/token.service';
import { CacheService } from './infrastructure/cache/cache.service';
import { EmailService } from './infrastructure/email/email.service';
import { NotificationService } from './infrastructure/notification/notification.service';
import { UserEventHandler } from './events/handlers/user.handler';
import { redis } from './infrastructure/cache';
// Register services
Container.set('redis', redis);
Container.set(CacheService, new CacheService(redis));
Container.set(TokenService, new TokenService(redis));
Container.set(EmailService, new EmailService());
Container.set(NotificationService, new NotificationService());
// Register repositories
Container.set(UserRepository, new UserRepository());
Container.set(AuditRepository, new AuditRepository());
// Register business services
Container.set(UserService, new UserService(
Container.get(UserRepository),
Container.get(CacheService),
Container.get(AuditRepository)
));
Container.set(AuthService, new AuthService(
Container.get(UserRepository),
Container.get(TokenService)
));
// Register event handlers
Container.set(UserEventHandler, new UserEventHandler(
Container.get(EmailService),
Container.get(NotificationService)
));
export { Container };
这一步实现了:
基础服务层:
- 通用 CRUD 操作
- 缓存管理
- 审计日志
- 事件发射
用户服务:
- 密码管理
- 邮箱管理
- 状态管理
- 业务验证
事件系统:
- 事件定义
- 事件发布
- 事件处理
- 异步通知
依赖注入:
- 服务容器
- 依赖管理
- 服务生命周期
主要特点:
缓存管理:
- 多级缓存
- 缓存失效
- 缓存同步
事件驱动:
- 解耦业务逻辑
- 异步处理
- 可扩展性
审计追踪:
- 变更记录
- 操作日志
- 数据历史
业务逻辑:
- 数据验证
- 业务规则
- 安全检查
第六步:实现 API 控制器、路由配置和 OpenAPI 文档。
// src/controllers/base.controller.ts
import { Request, Response, NextFunction } from 'express';
import { BaseService } from '../services/base.service';
import { QueryOptions } from '../domain/interfaces/repository';
import { logger } from '../infrastructure/logger';
export abstract class BaseController<T, K> {
constructor(protected readonly service: BaseService<T, K>) {}
async getAll(req: Request, res: Response, next: NextFunction): Promise<void> {
try {
const options: QueryOptions = {
page: parseInt(req.query.page as string) || 1,
limit: parseInt(req.query.limit as string) || 10,
sort: req.query.sort ? {
field: (req.query.sort as string).split(':')[0],
order: (req.query.sort as string).split(':')[1] as 'asc' | 'desc'
} : undefined,
filter: req.query.filter ? JSON.parse(req.query.filter as string) : undefined,
fields: req.query.fields ? (req.query.fields as string).split(',') : undefined,
};
const result = await this.service.findAll(options);
const baseUrl = `${req.protocol}://${req.get('host')}${req.baseUrl}`;
const totalPages = Math.ceil(result.total / (options.limit || 10));
res.json({
data: result.items,
meta: {
total: result.total,
page: options.page,
limit: options.limit,
totalPages,
},
links: this.generatePaginationLinks(baseUrl, options.page!, totalPages, options.limit!),
});
} catch (error) {
next(error);
}
}
async getById(req: Request, res: Response, next: NextFunction): Promise<void> {
try {
const id = req.params.id as any;
const entity = await this.service.findById(id);
res.json({
data: entity,
});
} catch (error) {
next(error);
}
}
async create(req: Request, res: Response, next: NextFunction): Promise<void> {
try {
const userId = req.user!.id;
const entity = await this.service.create(req.body, userId);
res.status(201).json({
data: entity,
});
} catch (error) {
next(error);
}
}
async update(req: Request, res: Response, next: NextFunction): Promise<void> {
try {
const id = req.params.id as any;
const userId = req.user!.id;
const entity = await this.service.update(id, req.body, userId);
res.json({
data: entity,
});
} catch (error) {
next(error);
}
}
async delete(req: Request, res: Response, next: NextFunction): Promise<void> {
try {
const id = req.params.id as any;
const userId = req.user!.id;
await this.service.delete(id, userId);
res.status(204).send();
} catch (error) {
next(error);
}
}
private generatePaginationLinks(
baseUrl: string,
currentPage: number,
totalPages: number,
limit: number
): Record<string, string | null> {
return {
self: `${baseUrl}?page=${currentPage}&limit=${limit}`,
first: currentPage === 1 ? null : `${baseUrl}?page=1&limit=${limit}`,
prev: currentPage > 1 ? `${baseUrl}?page=${currentPage - 1}&limit=${limit}` : null,
next: currentPage < totalPages ? `${baseUrl}?page=${currentPage + 1}&limit=${limit}` : null,
last: currentPage === totalPages ? null : `${baseUrl}?page=${totalPages}&limit=${limit}`,
};
}
}
// src/controllers/user.controller.ts
import { Request, Response, NextFunction } from 'express';
import { UserService } from '../services/user.service';
import { BaseController } from './base.controller';
import { User } from '../domain/entities/user.entity';
import { logger } from '../infrastructure/logger';
import { metrics } from '../infrastructure/metrics';
export class UserController extends BaseController<User, string> {
constructor(private readonly userService: UserService) {
super(userService);
}
async changePassword(req: Request, res: Response, next: NextFunction): Promise<void> {
try {
const { oldPassword, newPassword } = req.body;
const userId = req.user!.id;
await this.userService.changePassword(userId, oldPassword, newPassword);
logger.info({ userId }, 'Password changed successfully');
metrics.increment('user.password_changes');
res.status(200).json({
message: 'Password changed successfully',
});
} catch (error) {
next(error);
}
}
async changeEmail(req: Request, res: Response, next: NextFunction): Promise<void> {
try {
const { newEmail, password } = req.body;
const userId = req.user!.id;
await this.userService.changeEmail(userId, newEmail, password);
logger.info({ userId }, 'Email changed successfully');
metrics.increment('user.email_changes');
res.status(200).json({
message: 'Email changed successfully',
});
} catch (error) {
next(error);
}
}
async changeStatus(req: Request, res: Response, next: NextFunction): Promise<void> {
try {
const { status } = req.body;
const userId = req.params.id;
const adminId = req.user!.id;
await this.userService.changeStatus(userId, status, adminId);
logger.info({ userId, adminId, status }, 'User status changed');
metrics.increment('user.status_changes');
res.status(200).json({
message: 'User status changed successfully',
});
} catch (error) {
next(error);
}
}
}
// src/controllers/auth.controller.ts
import { Request, Response, NextFunction } from 'express';
import { AuthService } from '../services/auth.service';
import { logger } from '../infrastructure/logger';
import { metrics } from '../infrastructure/metrics';
export class AuthController {
constructor(private readonly authService: AuthService) {}
async register(req: Request, res: Response, next: NextFunction): Promise<void> {
try {
const user = await this.authService.register(req.body);
logger.info({ userId: user.id }, 'New user registered');
metrics.increment('auth.registrations');
res.status(201).json({
data: user,
});
} catch (error) {
next(error);
}
}
async login(req: Request, res: Response, next: NextFunction): Promise<void> {
try {
const { email, password } = req.body;
const result = await this.authService.login(email, password);
logger.info({ userId: result.user.id }, 'User logged in');
metrics.increment('auth.logins');
res.json({
data: result,
});
} catch (error) {
metrics.increment('auth.login_failures');
next(error);
}
}
async refreshToken(req: Request, res: Response, next: NextFunction): Promise<void> {
try {
const { refreshToken } = req.body;
const result = await this.authService.refreshToken(refreshToken);
logger.info({ userId: req.user?.id }, 'Token refreshed');
metrics.increment('auth.token_refreshes');
res.json({
data: result,
});
} catch (error) {
next(error);
}
}
async logout(req: Request, res: Response, next: NextFunction): Promise<void> {
try {
const userId = req.user!.id;
const tokenId = req.token!.jti;
await this.authService.logout(userId, tokenId);
logger.info({ userId }, 'User logged out');
metrics.increment('auth.logouts');
res.status(204).send();
} catch (error) {
next(error);
}
}
}
// src/routes/index.ts
import { Router } from 'express';
import { Container } from '../container';
import { authRouter } from './auth.routes';
import { userRouter } from './user.routes';
import { docsRouter } from './docs.routes';
import { authenticate } from '../middleware/auth';
import { rateLimiter } from '../middleware/rate-limiter';
export const router = Router();
// Public routes
router.use('/auth', rateLimiter, authRouter);
router.use('/docs', docsRouter);
// Protected routes
router.use('/users', authenticate, userRouter);
// Health check
router.get('/health', (req, res) => {
res.json({
status: 'ok',
timestamp: new Date().toISOString(),
version: process.env.npm_package_version,
});
});
// src/routes/user.routes.ts
import { Router } from 'express';
import { Container } from '../container';
import { UserController } from '../controllers/user.controller';
import { authorize } from '../middleware/auth';
import { validateRequest } from '../middleware/validation';
import { updateUserSchema } from '../db/schema';
const router = Router();
const userController = Container.get(UserController);
router.route('/')
.get(authorize('user.read'), userController.getAll.bind(userController))
.post(authorize('user.create'), validateRequest(updateUserSchema), userController.create.bind(userController));
router.route('/:id')
.get(authorize('user.read'), userController.getById.bind(userController))
.patch(authorize('user.update'), validateRequest(updateUserSchema), userController.update.bind(userController))
.delete(authorize('user.delete'), userController.delete.bind(userController));
router.post('/:id/password', authorize('user.update'), userController.changePassword.bind(userController));
router.post('/:id/email', authorize('user.update'), userController.changeEmail.bind(userController));
router.post('/:id/status', authorize('admin'), userController.changeStatus.bind(userController));
export { router as userRouter };
// src/routes/auth.routes.ts
import { Router } from 'express';
import { Container } from '../container';
import { AuthController } from '../controllers/auth.controller';
import { validateRequest } from '../middleware/validation';
import { LoginSchema, RegisterSchema } from '../schemas/auth.schema';
const router = Router();
const authController = Container.get(AuthController);
router.post('/register', validateRequest(RegisterSchema), authController.register.bind(authController));
router.post('/login', validateRequest(LoginSchema), authController.login.bind(authController));
router.post('/refresh-token', authController.refreshToken.bind(authController));
router.post('/logout', authController.logout.bind(authController));
export { router as authRouter };
让我们添加 OpenAPI 文档配置:
// src/docs/openapi.ts
import { OpenAPIObject } from 'openapi3-ts/oas31';
import { createApiDescription } from '@anatine/zod-openapi';
import { LoginSchema, RegisterSchema } from '../schemas/auth.schema';
import { insertUserSchema, updateUserSchema } from '../db/schema';
export const openApiDocument: OpenAPIObject = {
openapi: '3.1.0',
info: {
title: 'Modern REST API',
version: '1.0.0',
description: 'A modern RESTful API with complete features',
},
servers: [
{
url: '/api/v1',
description: 'API v1',
},
],
tags: [
{ name: 'Auth', description: 'Authentication endpoints' },
{ name: 'Users', description: 'User management' },
],
paths: {
'/auth/register': {
post: {
tags: ['Auth'],
summary: 'Register a new user',
requestBody: {
required: true,
content: {
'application/json': {
schema: createApiDescription(RegisterSchema),
},
},
},
responses: {
201: {
description: 'User created successfully',
content: {
'application/json': {
schema: {
type: 'object',
properties: {
data: {
$ref: '#/components/schemas/User',
},
},
},
},
},
},
400: {
$ref: '#/components/responses/ValidationError',
},
},
},
},
// ... More path definitions
},
components: {
schemas: {
User: createApiDescription(insertUserSchema.omit({ password: true })),
UpdateUser: createApiDescription(updateUserSchema),
Error: {
type: 'object',
properties: {
error: {
type: 'object',
properties: {
code: { type: 'string' },
message: { type: 'string' },
details: { type: 'object' },
},
required: ['code', 'message'],
},
},
},
},
securitySchemes: {
bearerAuth: {
type: 'http',
scheme: 'bearer',
bearerFormat: 'JWT',
},
},
responses: {
ValidationError: {
description: 'Validation error',
content: {
'application/json': {
schema: {
$ref: '#/components/schemas/Error',
},
},
},
},
// ... More response definitions
},
},
security: [
{
bearerAuth: [],
},
],
};
// src/routes/docs.routes.ts
import { Router } from 'express';
import swaggerUi from 'swagger-ui-express';
import { openApiDocument } from '../docs/openapi';
import { config } from '../config';
const router = Router();
if (config.app.env !== 'production') {
router.use('/', swaggerUi.serve);
router.get('/', swaggerUi.setup(openApiDocument, {
customCss: '.swagger-ui .topbar { display: none }',
customSiteTitle: 'API Documentation',
}));
}
export { router as docsRouter };
这一步实现了:
控制器层:
- 基础 CRUD 控制器
- 用户管理控制器
- 认证控制器
- 统一响应格式
路由配置:
- 模块化路由
- 权限控制
- 请求验证
- 路由中间件
API 文档:
- OpenAPI 规范
- Swagger UI
- 自动类型生成
- 示例响应
第七步:实现监控、指标收集和健康检查系统
// src/monitoring/metrics.ts
import promClient from 'prom-client';
import { Request, Response } from 'express';
// 初始化默认注册表
export const register = new promClient.Registry();
// 设置默认标签
register.setDefaultLabels({
app: 'modern-rest-api',
});
// 定义指标
export const metrics = {
// HTTP 请求指标
httpRequestDurationSeconds: new promClient.Histogram({
name: 'http_request_duration_seconds',
help: 'Duration of HTTP requests in seconds',
labelNames: ['method', 'route', 'status_code'],
buckets: [0.1, 0.5, 1, 2, 5],
}),
// 请求总数计数器
httpRequestsTotal: new promClient.Counter({
name: 'http_requests_total',
help: 'Total number of HTTP requests',
labelNames: ['method', 'route', 'status_code'],
}),
// API 错误计数器
apiErrorsTotal: new promClient.Counter({
name: 'api_errors_total',
help: 'Total number of API errors',
labelNames: ['type', 'route'],
}),
// 活跃用户计数器
activeUsers: new promClient.Gauge({
name: 'active_users',
help: 'Number of active users',
}),
// 数据库连接池指标
dbConnectionPoolSize: new promClient.Gauge({
name: 'db_connection_pool_size',
help: 'Database connection pool size',
}),
// 缓存命中率
cacheHitRatio: new promClient.Gauge({
name: 'cache_hit_ratio',
help: 'Cache hit ratio',
}),
// 认证指标
authMetrics: {
loginAttempts: new promClient.Counter({
name: 'auth_login_attempts_total',
help: 'Total number of login attempts',
labelNames: ['status'],
}),
tokenRefreshes: new promClient.Counter({
name: 'auth_token_refreshes_total',
help: 'Total number of token refreshes',
}),
},
};
// 注册所有指标
Object.values(metrics).forEach((metric) => {
if (metric instanceof promClient.Metric) {
register.registerMetric(metric);
} else if (typeof metric === 'object') {
Object.values(metric).forEach((m) => register.registerMetric(m));
}
});
// src/monitoring/health.ts
import { Request, Response } from 'express';
import { Redis } from 'ioredis';
import { db } from '../infrastructure/database';
import { redis } from '../infrastructure/cache';
import { logger } from '../infrastructure/logger';
interface HealthCheck {
status: 'ok' | 'error';
details?: unknown;
}
interface SystemHealth {
status: 'healthy' | 'unhealthy' | 'degraded';
timestamp: string;
version: string;
checks: {
[key: string]: HealthCheck;
};
}
export class HealthMonitor {
constructor(
private readonly db: typeof db,
private readonly redis: Redis
) {}
async checkHealth(): Promise<SystemHealth> {
const checks = await Promise.all([
this.checkDatabase(),
this.checkRedis(),
this.checkDiskSpace(),
this.checkMemoryUsage(),
]);
const [database, cache, disk, memory] = checks;
const health: SystemHealth = {
status: this.determineOverallStatus(checks),
timestamp: new Date().toISOString(),
version: process.env.npm_package_version || 'unknown',
checks: {
database,
cache,
disk,
memory,
},
};
return health;
}
private async checkDatabase(): Promise<HealthCheck> {
try {
await db.execute(sql`SELECT 1`);
return { status: 'ok' };
} catch (error) {
logger.error('Database health check failed:', error);
return {
status: 'error',
details: error instanceof Error ? error.message : 'Unknown error',
};
}
}
private async checkRedis(): Promise<HealthCheck> {
try {
const result = await redis.ping();
return { status: result === 'PONG' ? 'ok' : 'error' };
} catch (error) {
logger.error('Redis health check failed:', error);
return {
status: 'error',
details: error instanceof Error ? error.message : 'Unknown error',
};
}
}
private async checkDiskSpace(): Promise<HealthCheck> {
try {
const { freemem, totalmem } = process;
const usagePercent = (1 - freemem() / totalmem()) * 100;
return {
status: usagePercent < 90 ? 'ok' : 'error',
details: {
usagePercent: Math.round(usagePercent),
free: this.formatBytes(freemem()),
total: this.formatBytes(totalmem()),
},
};
} catch (error) {
logger.error('Disk space check failed:', error);
return { status: 'error', details: error };
}
}
private async checkMemoryUsage(): Promise<HealthCheck> {
try {
const usage = process.memoryUsage();
const heapUsed = usage.heapUsed / usage.heapTotal;
return {
status: heapUsed < 0.9 ? 'ok' : 'error',
details: {
heapUsed: this.formatBytes(usage.heapUsed),
heapTotal: this.formatBytes(usage.heapTotal),
external: this.formatBytes(usage.external),
rss: this.formatBytes(usage.rss),
},
};
} catch (error) {
logger.error('Memory usage check failed:', error);
return { status: 'error', details: error };
}
}
private determineOverallStatus(checks: HealthCheck[]): SystemHealth['status'] {
const errorCount = checks.filter(check => check.status === 'error').length;
if (errorCount === 0) return 'healthy';
if (errorCount === checks.length) return 'unhealthy';
return 'degraded';
}
private formatBytes(bytes: number): string {
const units = ['B', 'KB', 'MB', 'GB', 'TB'];
let value = bytes;
let unitIndex = 0;
while (value >= 1024 && unitIndex < units.length - 1) {
value /= 1024;
unitIndex++;
}
return `${value.toFixed(2)}${units[unitIndex]}`;
}
}
// src/middleware/metrics.ts
import { Request, Response, NextFunction } from 'express';
import { metrics } from '../monitoring/metrics';
export function metricsMiddleware() {
return (req: Request, res: Response, next: NextFunction) => {
const startTime = process.hrtime();
res.on('finish', () => {
const [seconds, nanoseconds] = process.hrtime(startTime);
const duration = seconds + nanoseconds / 1e9;
const route = req.route?.path || 'unknown';
const method = req.method;
const statusCode = res.statusCode.toString();
// 记录请求持续时间
metrics.httpRequestDurationSeconds.observe(
{ method, route, status_code: statusCode },
duration
);
// 增加请求计数
metrics.httpRequestsTotal.inc({
method,
route,
status_code: statusCode,
});
// 如果是错误响应,增加错误计数
if (statusCode.startsWith('4') || statusCode.startsWith('5')) {
metrics.apiErrorsTotal.inc({
type: statusCode.startsWith('4') ? 'client' : 'server',
route,
});
}
});
next();
};
}
// src/routes/monitoring.routes.ts
import { Router } from 'express';
import { register } from '../monitoring/metrics';
import { HealthMonitor } from '../monitoring/health';
import { Container } from '../container';
import { authenticate, authorize } from '../middleware/auth';
const router = Router();
const healthMonitor = Container.get(HealthMonitor);
// 健康检查端点
router.get('/health', async (req, res) => {
try {
const health = await healthMonitor.checkHealth();
const statusCode = health.status === 'healthy' ? 200 :
health.status === 'degraded' ? 200 : 503;
res.status(statusCode).json(health);
} catch (error) {
res.status(503).json({
status: 'unhealthy',
timestamp: new Date().toISOString(),
error: error instanceof Error ? error.message : 'Unknown error',
});
}
});
// 存活探针
router.get('/liveness', (req, res) => {
res.status(200).json({
status: 'ok',
timestamp: new Date().toISOString(),
});
});
// 就绪探针
router.get('/readiness', async (req, res) => {
try {
const health = await healthMonitor.checkHealth();
res.status(health.status === 'unhealthy' ? 503 : 200).json({
status: health.status === 'healthy' ? 'ok' : 'not_ready',
timestamp: new Date().toISOString(),
checks: health.checks,
});
} catch (error) {
res.status(503).json({
status: 'not_ready',
timestamp: new Date().toISOString(),
error: error instanceof Error ? error.message : 'Unknown error',
});
}
});
// 指标端点
router.get('/metrics', authenticate, authorize('admin'), async (req, res) => {
try {
res.set('Content-Type', register.contentType);
res.send(await register.metrics());
} catch (error) {
res.status(500).json({
error: 'Failed to collect metrics',
});
}
});
export { router as monitoringRouter };
现在添加 Grafana 仪表板配置:
{
"annotations": {
"list": []
},
"editable": true,
"fiscalYearStartMonth": 0,
"graphTooltip": 0,
"links": [],
"liveNow": false,
"panels": [
{
"datasource": {
"type": "prometheus",
"uid": "prometheus"
},
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "line",
"fillOpacity": 10,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "never",
"spanNulls": true,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
}
]
},
"unit": "short"
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 0
},
"id": 1,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "single",
"sort": "none"
}
},
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "prometheus"
},
"expr": "rate(http_requests_total[5m])",
"refId": "A"
}
],
"title": "Request Rate",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "prometheus"
},
"fieldConfig": {
"defaults": {
"color": {
"mode": "thresholds"
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
},
{
"color": "red",
"value": 80
}
]
},
"unit": "percent"
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 0
},
"id": 2,
"options": {
"orientation": "auto",
"reduceOptions": {
"calcs": [
"lastNotNull"
],
"fields": "",
"values": false
},
"showThresholdLabels": false,
"showThresholdMarkers": true
},
"pluginVersion": "10.0.0",
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "prometheus"
},
"expr": "100 - (rate(node_memory_MemAvailable_bytes[5m]) / rate(node_memory_MemTotal_bytes[5m]) * 100)",
"refId": "A"
}
],
"title": "Memory Usage",
"type": "gauge"
}
],
"refresh": "5s",
"schemaVersion": 38,
"style": "dark",
"tags": [],
"templating": {
"list": []
},
"time": {
"from": "now-6h",
"to": "now"
},
"timepicker": {},
"timezone": "",
"title": "API Monitoring Dashboard",
"version": 0,
"weekStart": ""
}
第八步:实现缓存系统和性能优化。
// src/infrastructure/cache/cache-manager.ts
import { Redis } from 'ioredis';
import { logger } from '../logger';
import { metrics } from '../../monitoring/metrics';
interface CacheOptions {
ttl?: number;
tags?: string[];
}
export class CacheManager {
private readonly defaultTTL = 3600; // 1 hour
constructor(private readonly redis: Redis) {}
async get<T>(key: string): Promise<T | null> {
const startTime = Date.now();
try {
const data = await this.redis.get(key);
// 更新缓存指标
metrics.cacheHitRatio.inc(data ? 1 : 0);
if (!data) {
return null;
}
return JSON.parse(data) as T;
} catch (error) {
logger.error({ error, key }, 'Cache get error');
return null;
} finally {
const duration = Date.now() - startTime;
metrics.httpRequestDurationSeconds.observe({ type: 'cache_get' }, duration / 1000);
}
}
async set<T>(key: string, value: T, options: CacheOptions = {}): Promise<void> {
const startTime = Date.now();
try {
const serializedValue = JSON.stringify(value);
const ttl = options.ttl || this.defaultTTL;
const multi = this.redis.multi();
// 设置主缓存
multi.setex(key, ttl, serializedValue);
// 如果有标签,更新标签索引
if (options.tags?.length) {
for (const tag of options.tags) {
multi.sadd(`tag:${tag}`, key);
}
}
await multi.exec();
} catch (error) {
logger.error({ error, key }, 'Cache set error');
} finally {
const duration = Date.now() - startTime;
metrics.httpRequestDurationSeconds.observe({ type: 'cache_set' }, duration / 1000);
}
}
async invalidate(key: string): Promise<void> {
try {
await this.redis.del(key);
} catch (error) {
logger.error({ error, key }, 'Cache invalidate error');
}
}
async invalidateByTags(tags: string[]): Promise<void> {
try {
const multi = this.redis.multi();
// 获取所有标记的键
for (const tag of tags) {
const keys = await this.redis.smembers(`tag:${tag}`);
if (keys.length) {
// 删除所有相关的缓存
multi.del(...keys);
// 清理标签集
multi.del(`tag:${tag}`);
}
}
await multi.exec();
} catch (error) {
logger.error({ error, tags }, 'Cache invalidate by tags error');
}
}
async invalidatePattern(pattern: string): Promise<void> {
try {
const keys = await this.redis.keys(pattern);
if (keys.length) {
await this.redis.del(...keys);
}
} catch (error) {
logger.error({ error, pattern }, 'Cache invalidate pattern error');
}
}
}
// src/infrastructure/cache/cache-decorator.ts
import { CacheManager } from './cache-manager';
import { generateCacheKey } from './utils';
interface CacheDecoratorOptions {
ttl?: number;
tags?: string[];
key?: string | ((target: any, propertyKey: string, args: any[]) => string);
}
export function Cacheable(options: CacheDecoratorOptions = {}) {
return function (
target: any,
propertyKey: string,
descriptor: PropertyDescriptor
) {
const originalMethod = descriptor.value;
const cacheManager = new CacheManager(globalRedis); // 假设有全局 Redis 实例
descriptor.value = async function (...args: any[]) {
const cacheKey = typeof options.key === 'function'
? options.key(target, propertyKey, args)
: options.key || generateCacheKey(target.constructor.name, propertyKey, args);
// 尝试从缓存获取
const cached = await cacheManager.get(cacheKey);
if (cached) {
return cached;
}
// 执行原始方法
const result = await originalMethod.apply(this, args);
// 缓存结果
await cacheManager.set(cacheKey, result, {
ttl: options.ttl,
tags: options.tags,
});
return result;
};
return descriptor;
};
}
// src/infrastructure/cache/utils.ts
import { createHash } from 'crypto';
export function generateCacheKey(
className: string,
methodName: string,
args: any[]
): string {
const argsHash = createHash('sha256')
.update(JSON.stringify(args))
.digest('hex');
return `${className}:${methodName}:${argsHash}`;
}
// src/middleware/cache.ts
import { Request, Response, NextFunction } from 'express';
import { CacheManager } from '../infrastructure/cache/cache-manager';
interface CacheMiddlewareOptions {
ttl?: number;
tags?: string[];
condition?: (req: Request) => boolean;
}
export function cacheMiddleware(options: CacheMiddlewareOptions = {}) {
const cacheManager = new CacheManager(globalRedis);
return async (req: Request, res: Response, next: NextFunction) => {
// 只缓存 GET 请求
if (req.method !== 'GET') {
return next();
}
// 检查条件
if (options.condition && !options.condition(req)) {
return next();
}
const cacheKey = `http:${req.originalUrl}`;
try {
// 尝试从缓存获取
const cached = await cacheManager.get(cacheKey);
if (cached) {
return res.json(cached);
}
// 保存原始的 json 方法
const originalJson = res.json;
// 重写 json 方法以拦截响应
res.json = function (body: any) {
// 恢复原始方法
res.json = originalJson;
// 缓存响应
void cacheManager.set(cacheKey, body, {
ttl: options.ttl,
tags: options.tags,
});
// 发送响应
return originalJson.call(this, body);
};
next();
} catch (error) {
next(error);
}
};
}
// src/infrastructure/performance/query-optimizer.ts
import { db } from '../database';
import { logger } from '../logger';
import { metrics } from '../../monitoring/metrics';
import { QueryOptions } from '../../domain/interfaces/repository';
export class QueryOptimizer {
async analyzeQuery(query: string): Promise<void> {
const startTime = Date.now();
try {
const explain = await db.execute(`EXPLAIN ANALYZE ${query}`);
// 记录查询计划
logger.debug({ explain }, 'Query execution plan');
// 更新性能指标
const duration = Date.now() - startTime;
metrics.httpRequestDurationSeconds.observe(
{ type: 'query_analysis' },
duration / 1000
);
} catch (error) {
logger.error({ error, query }, 'Query analysis failed');
}
}
optimizeQueryOptions(options: QueryOptions): QueryOptions {
// 限制最大页面大小
if (options.limit && options.limit > 100) {
options.limit = 100;
}
// 优化字段选择
if (options.fields?.length) {
// 确保总是包含主键
if (!options.fields.includes('id')) {
options.fields.push('id');
}
}
// 优化排序
if (options.sort) {
// 确保排序字段有索引
const indexedFields = ['id', 'created_at', 'updated_at'];
if (!indexedFields.includes(options.sort.field)) {
options.sort.field = 'created_at';
}
}
return options;
}
}
// src/infrastructure/performance/connection-pool.ts
import { Pool } from 'pg';
import { logger } from '../logger';
import { metrics } from '../../monitoring/metrics';
export class ConnectionPoolManager {
private readonly pool: Pool;
constructor(config: any) {
this.pool = new Pool({
...config,
max: 20,
idleTimeoutMillis: 30000,
connectionTimeoutMillis: 2000,
});
this.initializePoolMonitoring();
}
private initializePoolMonitoring(): void {
this.pool.on('connect', () => {
metrics.dbConnectionPoolSize.inc();
logger.debug('New database connection established');
});
this.pool.on('remove', () => {
metrics.dbConnectionPoolSize.dec();
logger.debug('Database connection removed from pool');
});
this.pool.on('error', (err) => {
logger.error('Unexpected error on idle client', err);
metrics.apiErrorsTotal.inc({ type: 'db_error' });
});
// 定期报告连接池状态
setInterval(() => {
const poolStatus = this.pool.totalCount;
metrics.dbConnectionPoolSize.set(poolStatus);
if (poolStatus >= this.pool.options.max * 0.8) {
logger.warn(
`Connection pool nearing capacity: ${poolStatus}/${this.pool.options.max}`
);
}
}, 5000);
}
async getConnection() {
const startTime = Date.now();
try {
const client = await this.pool.connect();
const duration = Date.now() - startTime;
metrics.httpRequestDurationSeconds.observe(
{ type: 'db_connection' },
duration / 1000
);
return client;
} catch (error) {
logger.error('Failed to get database connection', error);
throw error;
}
}
}
让我们也添加一些性能优化的中间件:
// src/middleware/performance.ts
import { Request, Response, NextFunction } from 'express';
import compression from 'compression';
import { logger } from '../infrastructure/logger';
import { metrics } from '../monitoring/metrics';
// 压缩中间件配置
export const compressionMiddleware = compression({
filter: (req, res) => {
if (req.headers['x-no-compression']) {
return false;
}
return compression.filter(req, res);
},
level: 6,
threshold: 1024,
});
// 性能监控中间件
export function performanceMonitor(req: Request, res: Response, next: NextFunction) {
const startTime = process.hrtime();
const startMemory = process.memoryUsage();
// 响应完成后记录性能指标
res.on('finish', () => {
const [seconds, nanoseconds] = process.hrtime(startTime);
const duration = seconds * 1000 + nanoseconds / 1000000;
const memoryDiff = process.memoryUsage().heapUsed - startMemory.heapUsed;
// 记录请求处理时间
metrics.httpRequestDurationSeconds.observe(
{
method: req.method,
route: req.route?.path || 'unknown',
status_code: res.statusCode.toString(),
},
duration / 1000
);
// 记录内存使用
logger.debug({
duration,
memoryUsed: memoryDiff,
path: req.path,
method: req.method,
}, 'Request performance metrics');
// 如果处理时间过长,记录警告
if (duration > 1000) {
logger.warn({
duration,
path: req.path,
method: req.method,
query: req.query,
}, 'Slow request detected');
}
});
next();
}
// 请求限流中间件
export function requestThrottling(options: {
windowMs?: number;
maxRequests?: number;
delayAfter?: number;
delayMs?: number;
}) {
const {
windowMs = 60 * 1000,
maxRequests = 100,
delayAfter = 50,
delayMs = 500,
} = options;
const requests = new Map<string, number[]>();
return (req: Request, res: Response, next: NextFunction) => {
const key = req.ip;
const now = Date.now();
// 清理过期的请求记录
if (!requests.has(key)) {
requests.set(key, []);
}
const userRequests = requests.get(key)!;
const validRequests = userRequests.filter(time => time > now - windowMs);
requests.set(key, validRequests);
// 检查是否超过限制
if (validRequests.length >= maxRequests) {
return res.status(429).json({
error: {
code: 'RATE_LIMIT_EXCEEDED',
message: 'Too many requests, please try again later',
},
});
}
// 添加新请求
validRequests.push(now);
// 如果请求数超过延迟阈值,添加延迟
if (validRequests.length > delayAfter) {
const delay = delayMs * (validRequests.length - delayAfter);
setTimeout(next, Math.min(delay, 10000));
} else {
next();
}
};
}
// 响应压缩中间件
export function responseCompression(req: Request, res: Response, next: NextFunction) {
// 检查是否支持压缩
const acceptEncoding = req.headers['accept-encoding'];
if (acceptEncoding?.includes('br')) {
res.setHeader('Content-Encoding', 'br');
} else if (acceptEncoding?.includes('gzip')) {
res.setHeader('Content-Encoding', 'gzip');
}
next();
}
// src/app.ts 中添加性能中间件
app.use(compressionMiddleware);
app.use(performanceMonitor);
app.use(requestThrottling({
windowMs: 15 * 60 * 1000,
maxRequests: 100,
}));
app.use(responseCompression);
让我们总结一下我们已经完成的全部内容:
项目基础设施:
- 项目结构和配置管理
- TypeScript 配置
- 依赖注入系统
- 错误处理系统
数据库和ORM:
- Drizzle ORM 配置
- 数据库迁移
- 数据模型定义
- 仓储模式实现
认证和授权:
- JWT 认证
- 角色权限管理
- 访问控制
- 令牌管理
业务逻辑层:
- 服务层实现
- 业务规则验证
- 事件系统
- 审计日志
API 实现:
- RESTful 控制器
- 路由配置
- 请求验证
- 响应格式化
监控和可观测性:
- 健康检查
- 指标收集
- 日志系统
- 性能监控
缓存和性能:
- Redis 缓存
- 性能优化
- 连接池管理
- 请求限流
开发工具和文档:
- OpenAPI/Swagger 文档
- 开发环境配置
- Docker 部署
- 测试框架
完整的项目现在应该具备:
安全性:
- 认证授权
- 数据验证
- 速率限制
- 安全头部
可扩展性:
- 模块化架构
- 事件驱动
- 缓存系统
- 连接池
可维护性:
- 清晰的代码结构
- 完整的文档
- 统一的错误处理
- 类型安全
可观测性:
- 性能监控
- 日志记录
- 健康检查
- 指标收集
开发体验:
- 热重载
- 调试支持
- API 文档
- 测试支持
这个实现提供了一个完整的、企业级的 RESTful API 框架。