并发控制队列
场景:批量上传文件
"假设用户要上传 100 张图片,但是服务器限制同时最多只能处理 5 个请求,你会怎么处理?"
让我们实现一个通用的并发控制队列:
typescript
class TaskQueue {
private queue: (() => Promise<any>)[] = [] // 存储待执行的任务队列
private activeCount = 0 // 当前正在执行的任务数量
private maxConcurrent: number // 最大并发数
constructor(maxConcurrent: number) {
this.maxConcurrent = maxConcurrent
}
// 获取待执行任务数量
public get pending(): number {
return this.queue.length
}
// 取当前执行中的任务数量
public get active(): number {
return this.activeCount
}
// 清空任务队列
public clear(): void {
this.queue = []
}
// 执行下一个任务
private next() {
if (this.queue.length === 0 || this.activeCount >= this.maxConcurrent) {
return
}
const task = this.queue.shift()
if (task) {
this.activeCount++
task().finally(() => {
this.activeCount--
this.next()
})
}
}
// 添加新任务到队列
public add<T>(fn: () => Promise<T>): Promise<T> {
return new Promise<T>((resolve, reject) => {
const task = async () => {
try {
resolve(await fn())
} catch (error) {
reject(error)
}
}
this.queue.push(task)
this.next()
})
}
}类的属性
- queue : 存储待执行的任务队列
- activeCount : 当前正在执行的任务数量
- maxConcurrent : 最大并发数
核心方法
- add : 添加新任务到队列
- next : 执行下一个任务
- clear : 清空任务队列
- pending : 获取待执行任务数量
- active : 获取当前执行中的任务数量
示例
typescript
// 创建队列实例,最大并发数为2
const queue = new TaskQueue(2)
// 模拟异步任务
const createTask = (id: number) => {
return () =>
new Promise<string>((resolve) => {
const duration = Math.random() * 2000
setTimeout(() => {
console.log(`Task ${id} completed`)
resolve(`Task ${id} result`)
}, duration)
})
}
// 添加任务
async function runTasks() {
console.log('开始执行任务')
// 添加5个任务
for (let i = 1; i <= 5; i++) {
queue.add(createTask(i)).then((result) => console.log(result))
console.log(`Task ${i} added, pending: ${queue.pending}, active: ${queue.active}`)
}
}
runTasks()