发布时间:2022-12-28 15:30
nodejs中的单进程异步锁,node异步锁,async-lock
为什么是async-lock?
因为我见其下载量比较大(在我发现的所有类似库中是最大的)且最近更新时间较近。
场景
在异步函数中,由于存在异步操作,对此函数的A,B两次执行会出现结束次序不可控。即A先执行,却比B后退出函数。
对于数据库的操作,有时会将select的值进行一系列业务变化后重新update至数据库。就比如简单的计数器,0开始计数,假设由于业务原因,
必须在代码中进行+1操作。则A读取数据库进入异步执行,此时B也开始执行读取数据库的代码,最终后造成两次执行后计数器只加了1。
此时便需要锁机制,类似于Java中的synchronized或者lock。执行同一段代码时,添加相同的key,则发现此key正在使用,则直接放入此key的待执行数组。
async-lock原理
AsyncLock类:
queues:用于存储所有的待执行函数,格式为:{key: [function, function]}
acquire(key, fn):
key:需要上锁的名称,即queues中的key
fn:为需要执行的函数。操作即将fn添加到queues[key]对应的数组中。若queues无此key字段,则将起值置[]后直接执行。
当key对应函数执行完毕后,会检查queues中对应key值数组是否为空,为空则执行delete queues[key],否则继续执行下一个待执行函数。
其余使用细节就 略。
问题
使用此方式的最大问题就是只能单进程运行,除非可以保证同一key总能分配到同一进程。
示例
示例一:
redis.get('key', function(err, value) {
redis.set('key', value * 2);
});
0
1
2
redis.get('key',function(err,value){
redis.set('key',value *2);
});
结果:
user1: redis.get('key') -> 1
user2: redis.get('key') -> 1
user1: redis.set('key', 1 x 2) -> 2
user2: redis.set('key', 1 x 2) -> 2
0
1
2
3
user1:redis.get('key')->1
user2:redis.get('key')->1
user1:redis.set('key',1x2)->2
user2:redis.set('key',1x2)->2
显然这不是你所期望的,使用async-lock,您可以轻松地编写异步临界区,如下:
lock.acquire('key', function(cb) {
// Concurrency safe
redis.get('key', function(err, value) {
redis.set('key', value * 2, cb);
});
}, function(err, ret) {
});
0
1
2
3
4
5
6
lock.acquire('key',function(cb){
// Concurrency safe
redis.get('key',function(err,value){
redis.set('key',value *2,cb);
});
},function(err,ret){
});
示例二:
var AsyncLock = require('async-lock');
var lock = new AsyncLock();
/**
* @param {String|Array} key resource key or keys to lock
* @param {function} fn execute function
* @param {function} cb (optional) callback function, otherwise will return a promise
* @param {Object} opts (optional) options
*/
lock.acquire(key, function(done) {
// async work
done(err, ret);
}, function(err, ret) {
// lock released
}, opts);
// Promise mode
lock.acquire(key, function() {
// return value or promise
}, opts).then(function() {
// lock released
});
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
varAsyncLock=require('async-lock');
varlock=newAsyncLock();
/**
* @param {String|Array} key resource key or keys to lock
* @param {function} fn execute function
* @param {function} cb (optional) callback function, otherwise will return a promise
* @param {Object} opts (optional) options
*/
lock.acquire(key,function(done){
// async work
done(err,ret);
},function(err,ret){
// lock released
},opts);
// Promise mode
lock.acquire(key,function(){
// return value or promise
},opts).then(function(){
// lock released
});
示例三(错误处理):
// Callback mode
lock.acquire(key, function(done) {
done(new Error('error'));
}, function(err, ret) {
console.log(err.message) // output: error
});
// Promise mode
lock.acquire(key, function() {
throw new Error('error');
}).catch(function(err) {
console.log(err.message) // output: error
});
0
1
2
3
4
5
6
7
8
9
10
11
12
// Callback mode
lock.acquire(key,function(done){
done(newError('error'));
},function(err,ret){
console.log(err.message)// output: error
});
// Promise mode
lock.acquire(key,function(){
thrownewError('error');
}).catch(function(err){
console.log(err.message)// output: error
});
示例四(获取多个键):
lock.acquire([key1, key2], fn, cb);
0
lock.acquire([key1,key2],fn,cb);
示例五(重用锁):
锁在同一个域中可重用
var domain = require('domain');
var lock = new AsyncLock({domainReentrant : true});
var d = domain.create();
d.run(function() {
lock.acquire('key', function() {
//Enter lock
return lock.acquire('key', function() {
//Enter same lock twice
});
});
});
0
1
2
3
4
5
6
7
8
9
10
11
vardomain=require('domain');
varlock=newAsyncLock({domainReentrant:true});
vard=domain.create();
d.run(function(){
lock.acquire('key',function(){
//Enter lock
returnlock.acquire('key',function(){
//Enter same lock twice
});
});
});
示例六(配置项):
// 指定超时时间,单位毫秒
var lock = new AsyncLock({timeout: 5000});
lock.acquire(key, fn, function(err, ret) {
// 如果在给定的时间内锁没有被获取,超时错误将在这里返回
});
// 设置最大挂起任务数量
var lock = new AsyncLock({maxPending: 1000});
lock.acquire(key, fn, function(err, ret) {
// 如果超出最大挂起数量,则在这里处理错误
})
// 是否有任何正在运行或挂起的异步函数
lock.isBusy();
// 使用您自己的promise库,而不是全局promise变量
var lock = new AsyncLock({Promise: require('bluebird')}); // Bluebird
var lock = new AsyncLock({Promise: require('q')}); // Q
// 将一个任务添加到队列的前端,等待一个给定的锁
lock.acquire(key, fn1, cb); // 立即运行
lock.acquire(key, fn2, cb); // 添加到队列
lock.acquire(key, priorityFn, cb, {skipQueue: true}); // 跳转队列并在fn2之前运行
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 指定超时时间,单位毫秒
varlock=newAsyncLock({timeout:5000});
lock.acquire(key,fn,function(err,ret){
// 如果在给定的时间内锁没有被获取,超时错误将在这里返回
});
// 设置最大挂起任务数量
varlock=newAsyncLock({maxPending:1000});
lock.acquire(key,fn,function(err,ret){
// 如果超出最大挂起数量,则在这里处理错误
})
// 是否有任何正在运行或挂起的异步函数
lock.isBusy();
// 使用您自己的promise库,而不是全局promise变量
varlock=newAsyncLock({Promise:require('bluebird')});// Bluebird
varlock=newAsyncLock({Promise:require('q')});// Q
// 将一个任务添加到队列的前端,等待一个给定的锁
lock.acquire(key,fn1,cb);// 立即运行
lock.acquire(key,fn2,cb);// 添加到队列
lock.acquire(key,priorityFn,cb,{skipQueue:true});// 跳转队列并在fn2之前运行
注:未经实测,只是照抄!
——————————-
npm地址 https://www.npmjs.com/package/async-lock
git地址 https//github.com/rogierschouten/async-lock