Redis + NodeJS 实现一个能处理海量数据的异步任务队列系统

发布时间:2023-12-17 11:30

一、引言

在最近的业务中,笔者接到了一个需要处理约十万条数据的需求。这些数据都以字符串的形式给到,并且处理它们的步骤是异步且耗时的(平均处理一条数据需要 25s 的时间)。如果以串行的方式实现,其耗时是相当长的:

总耗时时间 = 数据量 × 单条数据处理时间

T = N * t (N = 100,000; t = 25s)

总耗时时间 = 2,500,000 秒 ≈ 695 小时 ≈ 29 天

显然,我们不能简单地把数据一条一条地处理。那么有没有办法能够减少处理的时间呢?经过调研后发现,使用异步任务队列是个不错的办法。

下文将和大家分享用 Redis + NodeJS 实现一个能处理海量数据的异步任务队列系统的思路和方法,希望与大家一同交流。文章作者:jrain,腾讯应用研发工程师。

二、异步任务队列原理

我们可以把“处理单条数据”理解为一个异步任务,因此对这十万条数据的处理,就可以转化成有十万个异步任务等待进行。我们可以把这十万条数据塞到一个队列里面,让任务处理器自发地从队列里面去取得并完成。

任务处理器可以有多个,它们同时从队列里面把任务取走并处理。当任务队列为空,表示所有任务已经被认领完;当所有任务处理器完成任务,则表示所有任务已经被处理完。

其基本原理如下图所示:

Redis + NodeJS 实现一个能处理海量数据的异步任务队列系统_第1张图片

首先来解决任务队列的问题。在这个需求中,任务队列里面的每一个任务,都包含了待处理的数据,数据以字符串的形式存在。为了方便起见,我们可以使用 Redis 的 List 数据格式来存放这些任务。

由于项目是基于 NodeJS 的,我们可以利用 PM2 的 Cluster 模式 [2] 来启动多个任务处理器,并行地处理任务。以一个 8 核的 CPU 为例,如果完全开启了多进程,其理论处理时间将提升 8 倍,从 29 天缩短到 3.6 天。

接下来,笔者将从实际编码的角度来讲解上述内容的实现过程。

三、使用 NodeJS 操作 Redis

异步任务队列使用 Redis 来实现,因此我们需要部署一个单独的 Redis 服务。在本地开发中为了快速完成 Redis 的安装,我使用了 Docker 的办法(默认机器已经安装了 Docker)。

1. Docker 拉取 Redis 镜像

docker pull redis:latest

2. Docker 启动 Redis

docker run -itd --name redis-local -p 6379:6379 redis

此时我们已经使用 Docker 启动了一个 Redis 服务,其对外的 IP 及端口为 127.0.0.1:6379。此外,我们还可以在本地安装一个名为 Another Redis DeskTop Manager[3] 的 Redis 可视化工具,来实时查看、修改 Redis 的内容。

Redis + NodeJS 实现一个能处理海量数据的异步任务队列系统_第2张图片

在 NodeJS 中,我们可以使用 node-redis[4] 来操作 Redis。新建一个 mqclient.ts 文件并写入如下内容:

import * as Redis from 'redis'const client = Redis.createClient({  host: '127.0.0.1',  port: 6379})export default client

Redis 本质上是一个数据库,而我们对数据库的操作无非就是增删改查。node-redis 支持 Redis 的所有交互操作方式,但是操作结果默认是以回调函数的形式返回。

为了能够使用 async/await,我们可以新建一个 utils.ts 文件,把 node-redis 操作 Redis 的各种操作都封装成 Promise 的形式,方便我们后续使用。

import client from './mqClient'
// 获取 Redis 中某个 key 的内容export const getRedisValue = (key: string): Promise => new Promise(resolve => client.get(key, (err, reply) => resolve(reply)))// 设置 Redis 中某个 key 的内容export const setRedisValue = (key: string, value: string) => new Promise(resolve => client.set(key, value, resolve))// 删除 Redis 中某个 key 及其内容export const delRedisKey = (key: string) => new Promise(resolve => client.del(key, resolve))

除此之外,还能在 utils.ts 中放置其他常用的工具方法,以实现代码的复用、保证代码的整洁。

为了在 Redis 中创建任务队列,我们可以单独写一个 createTasks.ts 的脚本,用于往队列中塞入自定义的任务。

import { TASK_NAME, TASK_AMOUNT, setRedisValue, delRedisKey } from './utils'import client from './mqClient'
client.on('ready', async () => { await delRedisKey(TASK_NAME) for (let i = TASK_AMOUNT; i > 0 ; i--) { client.lpush(TASK_NAME, `task-${i}`) }
client.lrange(TASK_NAME, 0, TASK_AMOUNT, async (err, reply) => { if (err) { console.error(err) return } console.log(reply) process.exit() })})

在这段脚本中,我们从 utils.ts 中获取了各个 Redis 操作的方法,以及任务的名称 TASK_NAME (此处为 local_tasks)和任务的总数 TASK_AMOUNT(此处为 20 个)。

通过 LPUSH 方法往 TASK_NAME 的 List 当中塞入内容为 task-1 到 task-20 的任务,如图所示:

Redis + NodeJS 实现一个能处理海量数据的异步任务队列系统_第3张图片

Redis + NodeJS 实现一个能处理海量数据的异步任务队列系统_第4张图片

四、异步任务处理

首先新建一个 index.ts 文件,作为整个异步任务队列处理系统的入口文件。

import taskHandler from './tasksHandler'import client from './mqClient'
client.on('connect', () => { console.log('Redis is connected!')})client.on('ready', async () => { console.log('Redis is ready!') await taskHandler()})client.on('error', (e) => { console.log('Redis error! ' + e)})

在运行该文件时,会自动连接 Redis,并且在 ready 状态时执行任务处理器 taskHandler()。

在上一节的操作中,我们往任务队列里面添加了 20 个任务,每个任务都是形如 task-n 的字符串。为了验证异步任务的实现,我们可以在任务处理器 taskHandler.ts 中写一段 demo 函数,来模拟真正的异步任务:

function handleTask(task: string) { return new Promise((resolve) => { setTimeout(async () => { console.log(`Handling task: ${task}...`) resolve() }, 2000) }) }

上面这个 handleTask() 函数,将会在执行的 2 秒后打印出当前任务的内容,并返回一个 Promise,很好地模拟了异步函数的实现方式。接下来我们将会围绕这个函数,来处理队列中的任务。

其实到了这一步为止,整个异步任务队列处理系统已经基本完成了,只需要在 taskHandler.ts 中补充一点点代码即可:

import { popTask } from './utils'import client from './mqClient'
function handleTask(task: string) { /* ... */}
export default async function tasksHandler() { // 从队列中取出一个任务 const task = await popTask() // 处理任务 await handleTask(task) // 递归运行 await tasksHandler()}

最后,我们使用 PM2 启动 4 个进程,来试着跑一下整个项目:

pm2 start ./dist/index.js -i 4 && pm2 logs

Redis + NodeJS 实现一个能处理海量数据的异步任务队列系统_第5张图片

Redis + NodeJS 实现一个能处理海量数据的异步任务队列系统_第6张图片

可以看到,4 个任务处理器分别处理完了队列中的所有任务,相互之前互不影响。

事到如今已经大功告成了吗?未必。为了测试我们的这套系统到底提升了多少的效率,还需要统计完成队列里面所有任务的总耗时。

五、统计任务完成耗时

要统计任务完成的耗时,只需要实现下列的公式即可:

  • 总耗时 = 最后一个任务的完成时间 - 首个任务被取得的时间

ItVuer - 免责声明 - 关于我们 - 联系我们

本网站信息来源于互联网,如有侵权请联系:561261067@qq.com

桂ICP备16001015号