【mysql批量入库】
需求背景:有个调用统计日志存储和统计需求,要求存储到mysql中;存储数据高峰能达到日均千万,瓶颈在于直接入库并发太高,可能会把mysql干垮。
问题分析
思考:应用网站架构的衍化过程中,应用最新的框架和工具技术固然是最优选择;但是,如果能在现有的框架的基础上提出简单可依赖的解决方案,未尝不是一种提升自我的尝试。
解决:
问题一:要求日志最好入库;但是,直接入库mysql确实扛不住,批量入库没有问题,done。【批量入库和直接入库性能差异参考文章】
问题二:批量入库就需要有高并发的消息队列,决定采用redis list 仿真实现,而且方便回滚。
问题三:日志量毕竟大,保存最近30条足矣,决定用php写个离线统计和清理脚本。
简单实现过程
一:设计数据库表和存储
考虑到log系统对数据库的性能更多一些,稳定性和安全性没有那么高,存储引擎自然是只支持select insert 没有索引的archive。如果确实有update需求,也可以采用myISAM。
考虑到log是实时记录的所有数据,数量可能巨大,主键采用bigint,自增即可。
考虑到log系统以写为主,统计采用离线计算,字段均不要出现索引,因为一方面可能会影响插入数据效率,另外读时候会造成死锁,影响写数据。
二:redis存储数据形成消息队列
由于高并发,尽可能简单,直接,上代码。
/***************************************************************************
*
* 获取到的调用日志,存入redis的队列中.
* $Id$
*
**************************************************************************/
/**
* @file saveLog.php
* @date 2015/11/06 20:47:13
* @author:cuihuan
* @version $Revision$
* @brief
*
**/
// 获取info
$interface_info = $_GET['info'];
// 存入redis队列
$redis = new Redis();
$redis->connect('xx', 6379);
$redis->auth("password");
// 加上时间戳存入队列
$now_time = date("Y-m-d H:i:s");
$redis->rPush("call_log", $interface_info . "%" . $now_time);
$redis->close();
/* vim: set ts=4 sw=4 sts=4 tw=100 */
?>
三:数据定时批量入库。
定时读取redis消息队列里面的数据,批量入库。
/**
* 获取redis消息队列中的脚本,拼接sql,批量入库。
* @update 2015-11-07 添加失败消息队列回滚机制
*
* @Author:cuihuan
* 2015-11-06
* */
// init redis
$redis_xx = new Redis();
$redis_xx->connect('ip', port);
$redis_xx->auth("password");
// 获取现有消息队列的长度
$count = 0;
$max = $redis_xx->lLen("call_log");
// 获取消息队列的内容,拼接sql
$insert_sql = "insert into fb_call_log (`interface_name`, `createtime`) values ";
// 回滚数组
$roll_back_arr = array();
while ($count < $max) {
$log_info = $redis_cq01->lPop("call_log");
$roll_back_arr = $log_info;
if ($log_info == 'nil' || !isset($log_info)) {
$insert_sql .= ";";
break;
}
// 切割出时间和info
$log_info_arr = explode("%",$log_info);
$insert_sql .= " ('".$log_info_arr[0]."','".$log_info_arr[1]."'),";
$count++;
}
// 判定存在数据,批量入库
if ($count != 0) {
$link_2004 = mysql_connect('ip:port', 'user', 'password');
if (!$link_2004) {
die("Could not connect:" . mysql_error());
}
$crowd_db = mysql_select_db('fb_log', $link_2004);
$insert_sql = rtrim($insert_sql,",").";";
$res = mysql_query($insert_sql);
// 输出入库log和入库结果;
echo date("Y-m-d H:i:s")."insert ".$count." log info result:";
echo json_encode($res);
echo "\n";
// 数据库插入失败回滚
if(!$res){
foreach($roll_back_arr as $k){
$redis_xx->rPush("call_log", $k);
}
}
// 释放连接
mysql_free_result($res);
mysql_close($link_2004);
}
// 释放redis
$redis_cq01->close();
?>
四:离线天级统计和清理数据脚本
/**
* static log :每天离线统计代码日志和删除五天前的日志
*
* @Author:cuihuan
* 2015-11-06
* */
// 离线统计
$link_2004 = mysql_connect('ip:port', 'user', 'pwd');
if (!$link_2004) {
die("Could not connect:" . mysql_error());
}
$crowd_db = mysql_select_db('fb_log', $link_2004);
// 统计昨天的数据
$day_time = date("Y-m-d", time() - 60 * 60 * 24 * 1);
$static_sql = "get sql";
$res = mysql_query($static_sql, $link_2004);
// 获取结果入库略
// 清理15天之前的数据
$before_15_day = date("Y-m-d", time() - 60 * 60 * 24 * 15);
$delete_sql = "delete from xxx where createtime < '" . $before_15_day . "'";
try {
$res = mysql_query($delete_sql);
}catch(Exception $e){
echo json_encode($e)."\n";
echo "delete result:".json_encode($res)."\n";
}
mysql_close($link_2004);
?>
五:代码部署
主要是部署,批量入库脚本的调用和天级统计脚本,crontab例行运行。
# 批量入库脚本
*/2 * * * * /home/cuihuan/xxx/lamp/php5/bin/php /home/cuihuan/xxx/batchLog.php >>/home/cuihuan/xxx/batchlog.log
# 天级统计脚本
0 5 * * * /home/cuihuan/xxx/php5/bin/php /home/cuihuan/xxx/staticLog.php >>/home/cuihuan/xxx/staticLog.log
总结:相对于其他复杂的方式处理高并发,这个解决方案简单有效:通过redis缓存抗压,mysql批量入库解决数据库瓶颈,离线计算解决统计数据,通过定期清理保证库的大小。
【队列实现的方案】
方案一:使用消息队列来实现
可以基于例如MemcacheQ等这样的消息队列,具体的实现方案这么表述吧
比如有100张票可供用户抢,那么就可以把这100张票放到缓存中,读写时不要加锁。 当并发量大的时候,可能有500人左右抢票成功,这样对于500后面的请求可以直接转到活动结束的静态页面。进去的500个人中有400个人是不可能获得商品的。所以可以根据进入队列的先后顺序只能前100个人购买成功。后面400个人就直接转到活动结束页面。当然进去500个人只是举个例子,至于多少可以自己调整。而活动结束页面一定要用静态页面,不要用数据库。这样就减轻了数据库的压力。
方案二:当有多台服务器时,可以采用分流的形式实现
假设有m张票, 有n台产品服务器接收请求,有x个请求路由服务器随机转发
直接给每台产品服务器分配 m/n张票
每台产品服务器内存做计数器,比如允许m/n*(1+0.1)个人进来。
当内存计数器已满:
后面进的人, 直接跳到到转到活动结束的静态页面,
通知路由服务器,不在路由到这台服务器(这个值得商讨)。
所有产品服务器进来的m/n*(1+0.1)个人再全部转发到一台付款服务器上,进入付款环节,看谁手快了,这时候人少,加锁什么的就简单的。
方案三、如果是单服务器,可以使用Memcache锁来实现
product_key 为票的key
product_lock_key 为票锁key
当product_key存在于memcached中时,所有用户都可以进入下单流程。
当进入支付流程时,首先往memcached存放add(product_lock_key, “1″),
如果返回成功,进入支付流程。
如果不成,则说明已经有人进入支付流程,则线程等待N秒,递归执行add操作。
方案四、借助文件排他锁
在处理下单请求的时候,用flock锁定一个文件,如果锁定失败说明有其他订单正在处理,此时要么等待要么直接提示用户"服务器繁忙"
本文要说的是第4种方案,大致代码如下
阻塞(等待)模式:
$fp = fopen("lock.txt", "w+");
if(flock($fp,LOCK_EX))
{
//..处理订单
flock($fp,LOCK_UN);
}
fclose($fp);
?>
非阻塞模式:
$fp = fopen("lock.txt", "w+");
if(flock($fp,LOCK_EX | LOCK_NB))
{
//..处理订单
flock($fp,LOCK_UN);
}
else
{
echo "系统繁忙,请稍后再试";
}
fclose($fp);
?>
【用Redis轻松实现秒杀系统】
秒杀系统的架构设计
秒杀系统,是典型的短时大量突发访问类问题。对这类问题,有三种优化性能的思路:
写入内存而不是写入硬盘
异步处理而不是同步处理
分布式处理
用上这三招,不论秒杀时负载多大,都能轻松应对。更好的是,Redis能够满足上述三点。因此,用Redis就能轻松实现秒杀系统。
用我这个方案,无论是电商平台特价秒杀,12306火车票秒杀,都不是事:)
下面介绍一下为什么上述三种性能优化思路能够解决秒杀系统的性能问题:
写入内存而不是写入硬盘
传统硬盘的读写性能是相当差的。SSD硬盘比传统硬盘快100倍。而内存又比SSD硬盘快10倍以上。因此,写入内存而不是写入硬盘,就能使系统的能力提升上千倍。也就是说,原来你的秒杀系统可能需要1000台服务器支撑,现在1台服务器就可以扛住了。
你可能会有这样的疑问:写入内存而不是持久化,那么如果此时计算机宕机了,那么写入的数据不就全部丢失了吗?如果你就这么倒霉碰到服务器宕机,那你就没秒到了,有什么大不了?
最后,后面真正处理秒杀订单时,我们会把信息持久化到硬盘中。因此不会丢失关键数据。
Redis是一个缓存系统,数据写入内存后就返回给客户端了,能够支持这个特性。
异步处理而不是同步处理
像秒杀这样短时大并发的系统,在性能负载上有一个明显的波峰和长期的波谷。为了应对相当短时间的大并发而准备大量服务器来应对,在经济上是相当不合算的。
因此,对付秒杀类需求,就应该化同步为异步。用户请求写入内存后立刻返回。后台启动多个线程从内存池中异步读取数据,进行处理。如用户请求可能是1秒钟内进入的,系统实际处理完成可能花30分钟。那么一台服务器在异步情况下其处理能力大于同步情况下1800多倍!
异步处理,通常用MQ(消息队列)来实现。Redis可以看作是一个高性能的MQ。因为它的数据读写都发生在内存中。
分布式处理
好吧。也许你的客户很多,秒杀系统即使用了上面两招,还是捉襟见肘。没关系,我们还有大招:分布式处理。如果一台服务器撑不住秒杀系统,那么就多用几台服务器。10台不行,就上100台。分布式处理,就是把海量用户的请求分散到多个服务器上。一般使用hash实现均匀分布。
这类系统在大数据云计算时代的今天已经有很多了。无非是用Paxos算法和Hash Ring实现的。
Redis Cluster正是这样一个分布式的产品。
使用Redis实现描述系统
Redis和Redis Cluster(分布式版本),是一个分布式缓存系统。其支持多种数据结构,也支持MQ。Redis在性能上做了大量优化。因此使用Redis或者Redis Cluster就可以轻松实现一个强大的秒杀系统。
基本上,你用Redis的这些命令就可以了。
RPUSH key value
插入秒杀请求
当插入的秒杀请求数达到上限时,停止所有后续插入。
后台启动多个工作线程,使用
LPOP key
读取秒杀成功者的用户id,进行后续处理。
或者使用LRANGE key start end命令读取秒杀成功者的用户id,进行后续处理。
每完成一条秒杀记录的处理,就执行INCR key_num。一旦所有库存处理完毕,就结束该商品的本次秒杀,关闭工作线程,也不再接收秒杀请求。
要是还撑不住,该怎么办
也许你会说,我们的客户很多。即使部署了Redis Cluster,仍然撑不住。那该怎么办呢?
记得某个伟人曾经说过:办法总比困难多!
下面,我们具体分析下,还有哪些情况会压垮我们架构在Redis(Cluster)上的秒杀系统。
脚本攻击
如现在有很多抢火车票的软件。它们会自动发起http请求。一个客户端一秒会发起很多次请求。如果有很多用户使用了这样的软件,就可能会直接把我们的交换机给压垮了。
这个问题其实属于网络问题的范畴,和我们的秒杀系统不在一个层面上。因此不应该由我们来解决。很多交换机都有防止一个源IP发起过多请求的功能。开源软件也有不少能实现这点。如Linux上的TC可以控制。流行的Web服务器Nginx(它也可以看做是一个七层软交换机)也可以通过配置做到这一点。一个IP,一秒钟我就允许你访问我2次,其他软件包直接给你丢了,你还能压垮我吗?
交换机撑不住了
可能你们的客户并发访问量实在太大了,交换机都撑不住了。
这也有办法。我们可以用多个交换机为我们的秒杀系统服务。
原理就是DNS可以对一个域名返回多个IP,并且对不同的源IP,同一个域名返回不同的IP。如网通用户访问,就返回一个网通机房的IP;电信用户访问,就返回一个电信机房的IP。也就是用CDN了!
我们可以部署多台交换机为不同的用户服务。 用户通过这些交换机访问后面数据中心的Redis Cluster进行秒杀作业。
总结
有了Redis Cluster的帮助,做个支持海量用户的秒杀系统其实So Easy!
这里介绍的方案虽然是针对秒杀系统的,但其背后的原理对其他高并发系统一样有效。
最后,我们再重温一下高性能系统的优化原则:
写入内存而不是写入硬盘
异步处理而不是同步处理
分布式处理
【PHP使用php-resque库配合Redis实现MQ消息队列的教程】
消息队列处理后台任务带来的问题
项目中经常会有后台运行任务的需求,比如发送邮件时,因为要连接邮件服务器,往往需要5-10秒甚至更长时间,如果能先给用户一个成功的提示信息,然后在后台慢慢处理发送邮件的操作,显然会有更好的用户体验。
为了实现类似的需求,Web项目中一般的实现方法是使用消息队列(Message Queue),比如MemcacheQ,RabbitMQ等等,都是很著名的产品。
消息队列说白了就是一个最简单的先进先出队列,队列的一个成员就是一段文本。正是因为消息队列实在太简单了,当拿着消息队列时,反而有点无从下手的感觉,因为这仅仅一个发送邮件的任务,就会引申出很多问题:
消息队列只能存储字符串类型的数据,如何将一个发送邮件这样的“任务”,转换为消息队列中的一个“消息”?
消息队列只负责数据的存放与进出,本身不能执行任何程序,那么我们要如何从消息队列中一个一个取出数据,再将这些数据转化回任务并执行。
我们无法预知消息队列何时会有数据产生,所以我们的任务执行程序还需要具备监控消息队列的能力,也就是一个常驻后台的守护进程。
一般的Web应用PHP都以cgi方式运行,无法常驻内存。我们知道php还有cli模式,那么守护进程是否能以php cli来实现,效率如何?
当守护进程运行时,Web应用能否与后台守护进程交互,实现开启/杀死进程的功能以及获得进程的运行状态?
Resque对后台任务的设计与角色划分
对以上这些问题,目前为止我能找到的最好答案,并不是来自php,而是来自Ruby的项目Resque,正是由于Resque清晰简单的解决了后台任务带来的一系列问题,Resque的设计也被Clone到Python、php、NodeJs等语言:比如Python下的pyres以及PHP下的php-resque等等,这里有各种语言版本的Resque实现,而在本篇日志里,我们当然要以PHP版本为例来说明如何用php-resque运行一个后台任务,可能一些细节方面会与Ruby版有出入,但是本文中以php版为准。
Resque是这样解决这些问题的:
后台任务的角色划分
其实从上面的问题已经可以看出,只靠一个消息队列是无法解决所有问题的,需要新的角色介入。在Resque中,一个后台任务被抽象为由三种角色共同完成:
Job | 任务 : 一个Job就是一个需要在后台完成的任务,比如本文举例的发送邮件,就可以抽象为一个Job。在Resque中一个Job就是一个Class。
Queue | 队列 : 也就是上文的消息队列,在Resque中,队列则是由Redis实现的。Resque还提供了一个简单的队列管理器,可以实现将Job插入/取出队列等功能。
Worker | 执行者 : 负责从队列中取出Job并执行,可以以守护进程的方式运行在后台。
那么基于这个划分,一个后台任务在Resque下的基本流程是这样的:
将一个后台任务编写为一个独立的Class,这个Class就是一个Job。
在需要使用后台程序的地方,系统将Job Class的名称以及所需参数放入队列。
以命令行方式开启一个Worker,并通过参数指定Worker所需要处理的队列。
Worker作为守护进程运行,并且定时检查队列。
当队列中有Job时,Worker取出Job并运行,即实例化Job Class并执行Class中的方法。
至此就可以完整的运行完一个后台任务。
在Resque中,还有一个很重要的设计:一个Worker,可以处理一个队列,也可以处理很多个队列,并且可以通过增加Worker的进程/线程数来加快队列的执行速度。
php-resque的安装
需要提前说明的是,由于涉及到进程的开辟与管理,php-resque使用了php的PCNTL函数,所以只能在Linux下运行,并且需要php编译PCNTL函数。如果希望用Windows做同样的工作,那么可以去找找Resque的其他语言版本,php在Windows下非常不适合做后台任务。
以Ubuntu12.04LTS为例,Ubuntu用apt安装的php已经默认编译了PCNTL函数,无需任何配置,以下指令均为root帐号
安装Redis
?
apt-get install redis-server
安装Composer
apt-get install curl
cd /usr/local/bin
curl -s http://getcomposer.org/installer | php
chmod a+x composer.phar
alias composer='/usr/local/bin/composer.phar'
使用Composer安装php-resque
假设web目录在/opt/htdocs
apt-get install git git-core
cd /opt/htdocs
git clone git://github.com/chrisboulton/php-resque.git
cd php-resque
composer install
php-resque的使用
1.编写一个Worker
其实php-resque已经给出了简单的例子, demo/job.php文件就是一个最简单的Job:
class PHP_Job
{
public function perform()
{
sleep(120);
fwrite(STDOUT, 'Hello!');
}
}
这个Job就是在120秒后向STDOUT输出字符Hello!
在Resque的设计中,一个Job必须存在一个perform方法,Worker则会自动运行这个方法。
2.将Job插入队列
php-resque也给出了最简单的插入队列实现 demo/queue.php:
if(empty($argv[1])) {
die('Specify the name of a job to add. e.g, php queue.php PHP_Job');
}
require __DIR__ . '/init.php';
date_default_timezone_set('GMT');
Resque::setBackend('127.0.0.1:6379');
$args = array(
'time' => time(),
'array' => array(
'test' => 'test',
),
);
$jobId = Resque::enqueue('default', $argv[1], $args, true);
echo "Queued job ".$jobId."\n\n";
在这个例子中,queue.php需要以cli方式运行,将cli接收到的第一个参数作为Job名称,插入名为'default'的队列,同时向屏幕输出刚才插入队列的Job Id。在终端输入:
php demo/queue.php PHP_Job
结果可以看到屏幕上输出:
Queued job b1f01038e5e833d24b46271a0e31f6d6
即Job已经添加成功。注意这里的Job名称与我们编写的Job Class名称保持一致:PHP_Job
3.查看Job运行情况
php-resque同样提供了查看Job运行状态的例子,直接运行:
php demo/check_status.php b1f01038e5e833d24b46271a0e31f6d6
可以看到输出为:
Tracking status of b1f01038e5e833d24b46271a0e31f6d6. Press [break] to stop.
Status of b1f01038e5e833d24b46271a0e31f6d6 is: 1
我们刚才创建的Job状态为1。在Resque中,一个Job有以下4种状态:
Resque_Job_Status::STATUS_WAITING = 1; (等待)
Resque_Job_Status::STATUS_RUNNING = 2; (正在执行)
Resque_Job_Status::STATUS_FAILED = 3; (失败)
Resque_Job_Status::STATUS_COMPLETE = 4; (结束)
因为没有Worker运行,所以刚才创建的Job还是等待状态。
4.运行Worker
这次我们直接编写demo/resque.php:
date_default_timezone_set('GMT');
require 'job.php';
require '../bin/resque';
可以看到一个Worker至少需要两部分:
可以直接包含Job类文件,也可以使用php的自动加载机制,指定好Job Class所在路径并能实现自动加载
包含Resque的默认Worker: bin/resque
在终端中运行:
QUEUE=default php demo/resque.php
前面的QUEUE部分是设置环境变量,我们指定当前的Worker只负责处理default队列。也可以使用
QUEUE=* php demo/resque.php
来处理所有队列。
运行后输出为
#!/usr/bin/env php
*** Starting worker
用ps指令检查一下:
ps aux | grep resque
可以看到有一个php的守护进程已经在运行了
1000 4607 0.0 0.1 74816 11612 pts/3 S+ 14:52 0:00 php demo/resque.php
再使用之前的检查Job指令
php demo/check_status.php b1f01038e5e833d24b46271a0e31f6d6
2分钟后可以看到
Status of b1f01038e5e833d24b46271a0e31f6d6 is: 4
任务已经运行完毕,同时屏幕上应该可以看到输出的Hello!