上一篇文章中引入了消息队列对秒杀流量做削峰的处理,我们使用的是Kafka,看起来似乎工作的不错,但其实还是有很多隐患存在,如果这些隐患不优化处理掉,那么秒杀抢购活动开始后可能会出现消息堆积、消费延迟、数据不一致、甚至服务崩溃等问题,那么后果可想而知。本篇文章我们就一起来把这些隐患解决掉。
批量数据聚合
在SeckillOrder这个方法中,每来一次秒杀抢购请求都往往Kafka中发送一条消息。假如这个时候有一千万的用户同时来抢购,就算我们做了各种限流策略,一瞬间还是可能会有上百万的消息会发到Kafka,会产生大量的网络IO和磁盘IO成本,大家都知道Kafka是基于日志的消息系统,写消息虽然大多情况下都是顺序IO,但当海量的消息同时写入的时候还是可能会扛不住。
那怎么解决这个问题呢?答案是做消息的聚合。之前发送一条消息就会产生一次网络IO和一次磁盘IO,我们做消息聚合后,比如聚合100条消息后再发送给Kafka,这个时候100条消息才会产生一次网络IO和磁盘IO,对整个Kafka的吞吐和性能是一个非常大的提升。其实这就是一种小包聚合的思想,或者叫Batch或者批量的思想。这种思想也随处可见,比如我们使用Mysql插入批量数据的时候,可以通过一条SQL语句执行而不是循环的一条一条插入,还有Redis的Pipeline操作等等。
那怎么来聚合呢,聚合策略是啥呢?聚合策略有两个维度分别是聚合消息条数和聚合时间,比如聚合消息达到100条我们就往Kafka发送一次,这个条数是可以配置的,那如果一直也达不到100条消息怎么办呢?通过聚合时间来兜底,这个聚合时间也是可以配置的,比如配置聚合时间为1秒钟,也就是无论目前聚合了多少条消息只要聚合时间达到1秒,那么就往Kafka发送一次数据。聚合条数和聚合时间是或的关系,也就是只要有一个条件满足就触发。
在这里我们提供一个批量聚合数据的工具Batcher,定义如下
type Batcher struct {
opts options
Do func(ctx context.Context, val map[string][]interface{})
Sharding func(key string) int
chans []chan *msg
wait sync.WaitGroup
}
Do方法:满足聚合条件后就会执行Do方法,其中val参数为聚合后的数据
Sharding方法:通过Key进行sharding,相同的key消息写入到同一个channel中,被同一个goroutine处理
在merge方法中有两个触发执行Do方法的条件,一是当聚合的数据条数大于等于设置的条数,二是当触发设置的定时器
代码实现比较简单,如下为具体实现:
type msg struct {
key string
val interface{}
}
type Batcher struct {
opts options
Do func(ctx context.Context, val map[string][]interface{})
Sharding func(key string) int
chans []chan *msg
wait sync.WaitGroup
}
func New(opts ...Option) *Batcher {
b := &Batcher{}
for _, opt := range opts {
opt.apply(&b.opts)
}
b.opts.check()
b.chans = make([]chan *msg, b.opts.worker)
for i := 0; i < b.opts.worker; i++ {
b.chans[i] = make(chan *msg, b.opts.buffer)
}
return b
}
func (b *Batcher) Start() {
if b.Do == nil {
log.Fatal("Batcher: Do func is nil")
}
if b.Sharding == nil {
log.Fatal("Batcher: Sharding func is nil")
}
b.wait.Add(len(b.chans))
for i, ch := range b.chans {
go b.merge(i, ch)
}
}
func (b *Batcher) Add(key string, val interface{}) error {
ch, msg := b.add(key, val)
select {
case ch <- msg:
default:
return ErrFull
}
return nil
}
func (b *Batcher) add(key string, val interface{}) (chan *msg, *msg) {
sharding := b.Sharding(key) % b.opts.worker
ch := b.chans[sharding]
msg := &msg{key: key, val: val}
return ch, msg
}
func (b *Batcher) merge(idx int, ch <-chan *msg) {
defer b.wait.Done()
var (
msg *msg
count int
closed bool
lastTicker = true
interval = b.opts.interval
vals = make(map[string][]interface{}, b.opts.size)
)
if idx > 0 {
interval = time.Duration(int64(idx) * (int64(b.opts.interval) / int64(b.opts.worker)))
}
ticker := time.NewTicker(interval)
for {
select {
case msg = <-ch:
if msg == nil {
closed = true
break
}
count++
vals[msg.key] = append(vals[msg.key], msg.val)
if count >= b.opts.size {
break
}
continue
case <-ticker.C:
if lastTicker {
ticker.Stop()
ticker = time.NewTicker(b.opts.interval)
lastTicker = false
}
}
if len(vals) > 0 {
ctx := context.Background()
b.Do(ctx, vals)
vals = make(map[string][]interface{}, b.opts.size)
count = 0
}
if closed {
ticker.Stop()
return
}
}
}
func (b *Batcher) Close() {
for _, ch := range b.chans {
ch <- nil
}
b.wait.Wait()
}
使用的时候需要先创建一个Batcher,然后定义Batcher的Sharding方法和Do方法,在Sharding方法中通过ProductID把不同商品的聚合投递到不同的goroutine中处理,在Do方法中我们把聚合的数据一次性批量的发送到Kafka,定义如下:
b := batcher.New(
batcher.WithSize(batcherSize),
batcher.WithBuffer(batcherBuffer),
batcher.WithWorker(batcherWorker),
batcher.WithInterval(batcherInterval),
)
b.Sharding = func(key string) int {
pid, _ := strconv.ParseInt(key, 10, 64)
return int(pid) % batcherWorker
}
b.Do = func(ctx context.Context, val map[string][]interface{}) {
var msgs []*KafkaData
for _, vs := range val {
for _, v := range vs {
msgs = append(msgs, v.(*KafkaData))
}
}
kd, err := json.Marshal(msgs)
if err != nil {
logx.Errorf("Batcher.Do json.Marshal msgs: %v error: %v", msgs, err)
}
if err = s.svcCtx.KafkaPusher.Push(string(kd)); err != nil {
logx.Errorf("KafkaPusher.Push kd: %s error: %v", string(kd), err)
}
}
s.batcher = b
s.batcher.Start()
在SeckillOrder方法中不再是每来一次请求就往Kafka中投递一次消息,而是先通过batcher提供的Add方法添加到Batcher中等待满足聚合条件后再往Kafka中投递。
err = l.batcher.Add(strconv.FormatInt(in.ProductId, 10), &KafkaData{Uid: in.UserId, Pid: in.ProductId})
if err!= nil {
logx.Errorf("l.batcher.Add uid: %d pid: %d error: %v", in.UserId, in.ProductId, err)
}
降低消息的消费延迟
通过批量消息处理的思想,我们提供了Batcher工具,提升了性能,但这主要是针对生产端而言的。当我们消费到批量的数据后,还是需要串行的一条条的处理数据,那有没有办法能加速消费从而降低消费消息的延迟呢?有两种方案分别是:
- 增加消费者的数量
- 在一个消费者中增加消息处理的并行度