NestJS 队列
队列是一种有用的设计模式,可以帮助你处理一般应用规模和性能的挑战。一些队列可以帮助你处理的问题示例包括:
- 平滑输出峰值。例如,如果用户可以在任何时间创建资源敏感型任务,你可以将其添加到一个消息队列中而不是同步执行。然后你可以通过工作者进程从队列中以一个可控的方式取出进程。在应用规模增大时,你可以轻松添加新的队列消费者来提高后端任务处理能力。
- 将可能阻塞Node.js事件循环的整体任务打碎。例如,如果一个用户请求是 CPU 敏感型工作,例如音频转码,你可以将其委托给其他进程,从而保证用户接口进程保持响应。
- 在不同的服务间提供一个可信的通讯通道。例如,你可以将任务(工作)加入一个进程或服务,并由另一个进程或服务来消费他们。你可以在由其他任何进程或服务执行的工作完成、错误或者其他状态变化时得到通知(通过监听状态事件)。当队列生产者或者消费者失败时,他们的状态会被保留,任务将在 node 重启后自动重启。
Nest 提供了@nestjs/bull包,这是Bull包的一个包装器,Bull 是一个流行的、支持良好的、高性能的基于 Nodejs 的消息队列系统应用。该包将 Bull 队列以 Nest 友好的方式添加到你的应用中。
Bull 使用Redis持久化工作数据,因此你需要在你的系统中安装 Redis。因为他是基于 Redis 的,你的队列结构可以是完全分布式的并且和平台无关。例如,你可以有一些队列生产者、消费者和监听者,他们运行在 Nest 的一个或多个节点上,同时,其他生产者、消费者和监听者在其他 Node.js 平台或者其他网络节点上。
本章使用@nestjs/bull包,我们同时推荐阅读BUll 文档来获取更多背景和应用细节。
安装
要开始使用,我们首先安装需要的依赖:
$ npm install --save @nestjs/bull bull
$ npm install --save-dev @types/bull
一旦安装过程完成,我们可以在根AppModule中导入BullModule。
app.module.ts
import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bull';
@Module({
imports: [
BullModule.registerQueue({
name: 'audio',
redis: {
host: 'localhost',
port: 6379,
},
}),
],
})
export class AppModule {}
registerQueue()方法用于实例化并/或注册队列。队列在不同的模块和进程之间共享,在底层则通过同样的凭据连接到同样的 Redis 数据库。每个队列由其name属性区分(如下),当共享队列(跨模块/进程)时,第一个registerQueue()方法同时实例化该队列并向模块注册它。其他模块(在相同或者不同进程下)则简单地注册队列。队列注册创建一个injection token,它可以被用在给定 Nest 模块中获取队列。
针对每个队列,传递一个包含下列属性的配置对象:
-name:string- 一个队列名称,它可以被用作injection token(用于将队列注册到控制器/提供者),也可以作为装饰器参数来将消费者类和监听者与队列联系起来。是必须的。 -limiter:RateLimiter-该选项用于确定消息队列处理速率,查看RateLimiter获取更多信息。可选的。 -redis:RedisOpts-该选项用于配置 Redis 连接,查看RedisOpts获取更多信息。可选的。 -prefix: string-队列所有键的前缀。可选的。 -defaultJobOptions: JobOpts-选项用以控制新任务的默认属性。查看JobOpts获取更多信息。可选的。 -settings: AdvancedSettings-高级队列配置设置。这些通常不需要改变。查看AdvancedSettings获取更多信息。可选的。
注意,name属性是必须的。其他选项是可选的,为队列行为提供更细节的控制。这些会直接传递给 Bull 的Queue构造器。在这里阅读更多选项。当在第二个或者子模块中注册一个队列时,最佳时间是省略配置对象中除name属性之外的所有选项。这些选项仅应该在实例化队列的模块中确定。
在registerQueue()方法中传递多个逗号分隔的选项对象来创建多个队列。
由于任务在 Redis 中是持久化的,每次当一个特定名称的队列被实例化时(例如,当一个 app 启动/重启时),它尝试处理任何可能在前一个旧的任务遗留未完成的session。
每个队里可能有一个或很多生产者、消费者以及监听者。消费者从一个特定命令队列中获取任务:FIFO(默认,先进先出),LIFO(后进先出)或者依据优先级。
控制队列处理命令在这里讨论。
生产者
任务生产者添加任务到队列中。生产者是典型的应用服务(Nest 提供者)。要添加工作到一个队列,首先注册队列到服务中:
import { Injectable } from '@nestjs/common';
import { Queue } from 'bull';
import { InjectQueue } from '@nestjs/bull';
@Injectable()
export class AudioService {
constructor(@InjectQueue('audio') private audioQueue: Queue) {}
}
@InjectQueue()装饰器由其名称指定队列,像它在registerQueue()方法中提供的那样(例如,audio)。
现在,通过调用队列的add()方法添加一个任务,传递一个用户定义的任务对象。任务表现为序列化的JavaScript对象(因为它们被存储在 Redis 数据库中)。你传递的任务形式是可选的;用它来在语义上表示你任务对象:
const job = await this.audioQueue.add({
foo: 'bar',
});
命名的任务
任务需要独一无二的名字。这允许你创建专用的消费者,这将仅处理给定名称的处理任务。
const job = await this.audioQueue.add('transcode', {
foo: 'bar',
});
当使用命名任务时,你必须为每个添加到队列中的特有名称创建处理者,否则队列会反馈缺失了给定任务的处理器。查看这里阅读更多关于消费命名任务的信息。
任务选项
任务可以包括附加选项。在Quene.add()方法的job参数之后传递选项对象。任务选项属性有:
- priority: number-选项优先级值。范围从 1(最高优先)到 MAX_INT(最低优先)。注意使用属性对性能有轻微影响,因此要小心使用。
- delay: number- 任务执行前等待的时间(毫秒)。注意,为了精确延时,服务端和客户端时钟应该同步。
- attempts: number-任务结束前总的尝试次数。
- repeat: RepeatOpts-按照定时设置重复任务记录,查看RepeatOpts。
- backoff: number | BackoffOpts- 如果任务失败,自动重试闪避设置,查看BackoffOpts。
- lifo: boolean-如果为true,从队列右端添加任务以替代从左边添加(默认为 false)。
- timeout: number-任务超时失败的毫秒数。
- jobId: number | string- 覆盖任务 ID-默认地,任务 ID 是唯一的整数,但你可以使用该参数覆盖它。如果你使用这个选项,你需要保证jobId是唯一的。如果你尝试添加一个包含已有 id 的任务,它不会被添加。
- removeOnComplete: boolean | number-如果为true,当任务完成时移除任务。一个数字用来指定要保存的任务数。默认行为是将完成的工作保存在已完成的设置中。
- removeOnFail: boolean | number-如果为true,当所有尝试失败时移除任务。一个数字用来指定要保存的任务数。默认行为是将失败的任务保存在已失败的设置中。
- stackTraceLimit: number-限制在stacktrace中保存的堆栈跟踪线。
这里是一些带有任务选项的自定义任务示例。
要延迟任务的开始,使用delay配置属性:
const job = await this.audioQueue.add(
{
foo: 'bar',
},
{ delay: 3000 } // 3 seconds delayed
);
要从右端添加任务到队列(以 LIFO(后进先出)处理任务),设置配置对象的lifo属性为true。
const job = await this.audioQueue.add(
{
foo: 'bar',
},
{ lifo: true }
);
要优先一个任务,使用priority属性。
const job = await this.audioQueue.add(
{
foo: 'bar',
},
{ priority: 2 }
);
消费者
消费者是一个类,定义的方法要么处理添加到队列中的任务,要么监听队列的事件,或者两者皆有。使用@Processor()装饰器来定义消费者类,如下:
import { Processor } from '@nestjs/bull';
@Processor('audio')
export class AudioConsumer {}
装饰器的字符串参数(例如,audio)是和类方法关联的队列名称。
在消费者类中,使用@Process()装饰器来装饰任务处理者。
import { Processor, Process } from '@nestjs/bull';
import { Job } from 'bull';
@Processor('audio')
export class AudioConsumer {
@Process()
async transcode(job: Job<unknown>) {
let progress = 0;
for (i = 0; i < 100; i++) {
await doSomething(job.data);
progress += 10;
job.progress(progress);
}
return {};
}
}
装饰器方法(例如transcode()) 在工作空闲或者队列中有消息要处理的时候被调用。该处理器方法接受job对象作为其仅有的参数。处理器方法的返回值被保存在任务对象中,可以在之后被访问,例如,在用于完成事件的监听者中。
Job对象有多个方法,允许你和他们的状态交互。例如,上述代码使用progress()方法来更新工作进程。查看这里以了解完整的Job对象 API 参照。
你可以指定一个任务处理方法,仅处理指定类型(包含特定name的任务)的任务,这可以通过如下所述的将name传递给@Process()装饰器完成。你在一个给定消费者类中可以有多个@Process()处理器,以反应每个任务类型(name),确保每个name有相应的处理者。
@Process('transcode')
async transcode(job: Job<unknown>) { ... }
事件监听者
当队列和/或任务状态改变时,Bull生成一个有用的事件集合。Nest 提供了一个装饰器集合,允许订阅一系列标准核心事件集合。他们从@nestjs/bull包中导出。
事件监听者必须在一个消费者类中声明(通过@Processor()装饰器)。要监听一个事件,使用如下表格之一的装饰器来声明一个事件处理器。例如,当一个任务进入audio队列活跃状态时,要监听其发射的事件,使用下列结构:
import { Processor, Process } from '@nestjs/bull';
import { Job } from 'bull';
@Processor('audio')
export class AudioConsumer {
@OnQueueActive()
onActive(job: Job) {
console.log(
`Processing job ${job.id} of type ${job.name} with data ${job.data}...`,
);
}
鉴于 BUll 运行于分布式(多 node)环境,它定义了本地事件概念。该概念可以辨识出一个由完整的单一进程触发的事件,或者由不同进程共享的队列。一个本地事件是指在本地进程中触发的一个队列行为或者状态变更。换句话说,当你的事件生产者和消费者是本地单进程时,队列中所有事件都是本地的。
当一个队列在多个进程中共享时,我们可能要遇到全局事件。对一个由其他进程触发的事件通知器进程的监听者来说,它必须注册为全局事件。
当相应事件发射时事件处理器被唤醒。该处理器被下表所示的签名调用,提供访问事件相关的信息。我们讨论下面签名中本地和全局事件处理器。
本地事件监听者 | 全局事件监听者 | 处理器方法签名/当触发时 |
---|---|---|
@OnQueueError() | @OnGlobalQueueError() | handler(error: Error) - 当错误发生时,error 包括触发错误 |
@OnQueueWaiting() | @OnGlobalQueueWaiting() | handler(jobId: number | string) - 一旦工作者空闲就等待执行的任务,jobId 包括进入此状态的 id |
@OnQueueActive() | @OnGlobalQueueActive() | handler(job: Job)-job 任务已启动 |
@OnQueueStalled() | @OnGlobalQueueStalled() | handler(job: Job)-job 任务被标记为延迟。这在时间循环崩溃或暂停时进行调试工作时是很有效的 |
@OnQueueProgress() | @OnGlobalQueueProgress() | handler(job: Job, progress: number)-job 任务进程被更新为progress 值 |
@OnQueueCompleted() | @OnGlobalQueueCompleted() | handler(job: Job, result: any) job 任务进程成功以result 结束 |
@OnQueueFailed() | @OnGlobalQueueFailed() | handler(job: Job, err: Error)job 任务以err 原因失败 |
@OnQueuePaused() | @OnGlobalQueuePaused() | handler()队列被暂停 |
@OnQueueResumed() | @OnGlobalQueueResumed() | handler(job: Job)队列被恢复 |
@OnQueueCleaned() | @OnGlobalQueueCleaned() | handler(jobs: Job[], type: string) 旧任务从队列中被清理,job 是一个清理任务数组,type 是要清理的任务类型 |
@OnQueueDrained() | @OnGlobalQueueDrained() | handler()在队列处理完所有等待的任务(除非有些尚未处理的任务被延迟)时发射出 |
@OnQueueRemoved() | @OnGlobalQueueRemoved() | handler(job: Job)job 任务被成功移除 |
当监听全局事件时,签名方法可能和本地有一点不同。特别地,本地版本的任何方法签名接受job对象的方法签名而不是全局版本的jobId(number)。要在这种情况下获取实际的job对象的引用,使用Queue#getJob方法。这种调用可能需要等待,因此处理者应该被声明为async,例如:
@OnGlobalQueueCompleted()
async onGlobalCompleted(jobId: number, result: any) {
const job = await this.immediateQueue.getJob(jobId);
console.log('(Global) on completed: job ', job.id, ' -> result: ', result);
}
要获取一个Queue对象(使用getJob()调用),你当然必须注入它。同时,队列必须注册到你要注入的模块中。
在特定事件监听器装饰器之外,你可以使用通用的@OnQueueEvent()装饰器与BullQueueEvents或者BullQueueGlobalEvents枚举相结合。在这里阅读更多有关事件的内容。
队列管理
队列有一个 API 来实现管理功能比如暂停、恢复、检索不同状态的任务数量等。你可以在这里找到完整的队列 API。直接在Queue对象上调用这些方法,如下所示的暂停/恢复示例。
使用pause()方法调用来暂停队列。一个暂停的队列在恢复前将不会处理新的任务,但会继续处理完当前执行的任务。
await audioQueue.pause();
要恢复一个暂停的队列,使用resume()方法,如下:
await audioQueue.resume();
异步配置
你可能需要异步而不是静态传递队列选项。在这种情况下,使用registerQueueAsync()方法,可以提供不同的异步配置方法。
一个方法是使用工厂函数:
BullModule.registerQueueAsync({
name: 'audio',
useFactory: () => ({
redis: {
host: 'localhost',
port: 6379,
},
}),
});
我们的工厂函数方法和其他异步提供者(它可以是async的并可以使用inject来注入)方法相同。
BullModule.registerQueueAsync({
name: 'audio',
imports: [ConfigModule],
useFactory: async (configService: ConfigService) => ({
redis: {
host: configService.get('QUEUE_HOST'),
port: +configService.get('QUEUE_PORT'),
},
}),
inject: [ConfigService],
});
可选的,你可以使用useClass语法。
BullModule.registerQueueAsync({
name: 'audio',
useClass: BullConfigService,
});
上述结构在BullModule中实例化BullConfigService,并通过调用createBullOptions()来用它提供一个选项对象。注意这意味着BullConfigService要实现BullOptionsFactory工厂接口,如下:
@Injectable()
class BullConfigService implements BullOptionsFactory {
createBullOptions(): BullModuleOptions {
return {
redis: {
host: 'localhost',
port: 6379,
},
};
}
}
要阻止在BullModule中创建BullConfigService并使用一个从其他模块导入的提供者,可以使用useExisting语法。
BullModule.registerQueueAsync({
name: 'audio',
imports: [ConfigModule],
useExisting: ConfigService,
});
这个结构和useClass有一个根本区别——BullModule将查找导入的模块来重用现有的ConfigServie而不是实例化一个新的。
示例
一个可用的示例见这里。
更多建议: