为什么nodejs是单进程的_nodejs中的单进程异步锁——async-lock

发布时间:2022-12-28 15:30

nodejs中的单进程异步锁,node异步锁,async-lock

为什么是async-lock?

因为我见其下载量比较大(在我发现的所有类似库中是最大的)且最近更新时间较近。

场景

在异步函数中,由于存在异步操作,对此函数的A,B两次执行会出现结束次序不可控。即A先执行,却比B后退出函数。

对于数据库的操作,有时会将select的值进行一系列业务变化后重新update至数据库。就比如简单的计数器,0开始计数,假设由于业务原因,

必须在代码中进行+1操作。则A读取数据库进入异步执行,此时B也开始执行读取数据库的代码,最终后造成两次执行后计数器只加了1。

此时便需要锁机制,类似于Java中的synchronized或者lock。执行同一段代码时,添加相同的key,则发现此key正在使用,则直接放入此key的待执行数组。

async-lock原理

AsyncLock类:

queues:用于存储所有的待执行函数,格式为:{key: [function, function]}

acquire(key, fn):

key:需要上锁的名称,即queues中的key

fn:为需要执行的函数。操作即将fn添加到queues[key]对应的数组中。若queues无此key字段,则将起值置[]后直接执行。

当key对应函数执行完毕后,会检查queues中对应key值数组是否为空,为空则执行delete queues[key],否则继续执行下一个待执行函数。

其余使用细节就 略。

问题

使用此方式的最大问题就是只能单进程运行,除非可以保证同一key总能分配到同一进程。

示例

示例一:

redis.get('key', function(err, value) {

redis.set('key', value * 2);

});

0

1

2

redis.get('key',function(err,value){

redis.set('key',value *2);

});

结果:

user1: redis.get('key') -> 1

user2: redis.get('key') -> 1

user1: redis.set('key', 1 x 2) -> 2

user2: redis.set('key', 1 x 2) -> 2

0

1

2

3

user1:redis.get('key')->1

user2:redis.get('key')->1

user1:redis.set('key',1x2)->2

user2:redis.set('key',1x2)->2

显然这不是你所期望的,使用async-lock,您可以轻松地编写异步临界区,如下:

lock.acquire('key', function(cb) {

// Concurrency safe

redis.get('key', function(err, value) {

redis.set('key', value * 2, cb);

});

}, function(err, ret) {

});

0

1

2

3

4

5

6

lock.acquire('key',function(cb){

// Concurrency safe

redis.get('key',function(err,value){

redis.set('key',value *2,cb);

});

},function(err,ret){

});

示例二:

var AsyncLock = require('async-lock');

var lock = new AsyncLock();

/**

* @param {String|Array} key resource key or keys to lock

* @param {function} fn execute function

* @param {function} cb (optional) callback function, otherwise will return a promise

* @param {Object} opts (optional) options

*/

lock.acquire(key, function(done) {

// async work

done(err, ret);

}, function(err, ret) {

// lock released

}, opts);

// Promise mode

lock.acquire(key, function() {

// return value or promise

}, opts).then(function() {

// lock released

});

0

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

varAsyncLock=require('async-lock');

varlock=newAsyncLock();

/**

* @param {String|Array} key resource key or keys to lock

* @param {function} fn execute function

* @param {function} cb (optional) callback function, otherwise will return a promise

* @param {Object} opts (optional) options

*/

lock.acquire(key,function(done){

// async work

done(err,ret);

},function(err,ret){

// lock released

},opts);

// Promise mode

lock.acquire(key,function(){

// return value or promise

},opts).then(function(){

// lock released

});

示例三(错误处理):

// Callback mode

lock.acquire(key, function(done) {

done(new Error('error'));

}, function(err, ret) {

console.log(err.message) // output: error

});

// Promise mode

lock.acquire(key, function() {

throw new Error('error');

}).catch(function(err) {

console.log(err.message) // output: error

});

0

1

2

3

4

5

6

7

8

9

10

11

12

// Callback mode

lock.acquire(key,function(done){

done(newError('error'));

},function(err,ret){

console.log(err.message)// output: error

});

// Promise mode

lock.acquire(key,function(){

thrownewError('error');

}).catch(function(err){

console.log(err.message)// output: error

});

示例四(获取多个键):

lock.acquire([key1, key2], fn, cb);

0

lock.acquire([key1,key2],fn,cb);

示例五(重用锁):

锁在同一个域中可重用

var domain = require('domain');

var lock = new AsyncLock({domainReentrant : true});

var d = domain.create();

d.run(function() {

lock.acquire('key', function() {

//Enter lock

return lock.acquire('key', function() {

//Enter same lock twice

});

});

});

0

1

2

3

4

5

6

7

8

9

10

11

vardomain=require('domain');

varlock=newAsyncLock({domainReentrant:true});

vard=domain.create();

d.run(function(){

lock.acquire('key',function(){

//Enter lock

returnlock.acquire('key',function(){

//Enter same lock twice

});

});

});

示例六(配置项):

// 指定超时时间,单位毫秒

var lock = new AsyncLock({timeout: 5000});

lock.acquire(key, fn, function(err, ret) {

// 如果在给定的时间内锁没有被获取,超时错误将在这里返回

});

// 设置最大挂起任务数量

var lock = new AsyncLock({maxPending: 1000});

lock.acquire(key, fn, function(err, ret) {

// 如果超出最大挂起数量,则在这里处理错误

})

// 是否有任何正在运行或挂起的异步函数

lock.isBusy();

// 使用您自己的promise库,而不是全局promise变量

var lock = new AsyncLock({Promise: require('bluebird')}); // Bluebird

var lock = new AsyncLock({Promise: require('q')}); // Q

// 将一个任务添加到队列的前端,等待一个给定的锁

lock.acquire(key, fn1, cb); // 立即运行

lock.acquire(key, fn2, cb); // 添加到队列

lock.acquire(key, priorityFn, cb, {skipQueue: true}); // 跳转队列并在fn2之前运行

0

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

// 指定超时时间,单位毫秒

varlock=newAsyncLock({timeout:5000});

lock.acquire(key,fn,function(err,ret){

// 如果在给定的时间内锁没有被获取,超时错误将在这里返回

});

// 设置最大挂起任务数量

varlock=newAsyncLock({maxPending:1000});

lock.acquire(key,fn,function(err,ret){

// 如果超出最大挂起数量,则在这里处理错误

})

// 是否有任何正在运行或挂起的异步函数

lock.isBusy();

// 使用您自己的promise库,而不是全局promise变量

varlock=newAsyncLock({Promise:require('bluebird')});// Bluebird

varlock=newAsyncLock({Promise:require('q')});// Q

// 将一个任务添加到队列的前端,等待一个给定的锁

lock.acquire(key,fn1,cb);// 立即运行

lock.acquire(key,fn2,cb);// 添加到队列

lock.acquire(key,priorityFn,cb,{skipQueue:true});// 跳转队列并在fn2之前运行

注:未经实测,只是照抄!

——————————-

npm地址 https://www.npmjs.com/package/async-lock

git地址 https//github.com/rogierschouten/async-lock

ItVuer - 免责声明 - 关于我们 - 联系我们

本网站信息来源于互联网,如有侵权请联系:561261067@qq.com

桂ICP备16001015号