发布时间:2023-10-09 17:30
我们之前在SpringBoot项目里面使用定时任务,先是开启定时任务 @EnableScheduling
,然后使用 @Scheduled(cron = \"*/1 * * * * ?\")
,这样使用起来很简单,也没有什么问题。
但思考一下这样的场景,如果一个服务已经满足不了我们的需求,这时候应该怎么办呢?我们很容易想到集群,部署多份。
一般我们的服务部署多份只需要前面加一个负载的功能就行,毕竟每个服务提供的是一样的服务。但是定时任务则不行,比如我们A服务的fun方法运行到一半,然后B服务的fun开始运行,这样肯定会出现问题的。
而我们的ElasticJob则可以很好的去解决这个问题,并且可以很好的支持水平扩容/缩容。当然了它还附带了其它的功能比如好看好用的操作界面、异常通知(邮件、企业微信、钉钉)等。
ElasticJob的使用是依赖zookeeper的(多个服务之间肯定是要通信才可以做到上面的功能),这个就自己去安装一下了,如果只是测试可以安装一个windows版本很简单。
ElasticJob里面提供两个模块,ElasticJob-Lite、ElasticJob-Cloud,我们下面只介绍ElasticJob-Lite的使用。
官方文档
如果修改yml文件不生效,可有如下两个办法
overwrite: true
作业其实就是定时任务,每一个作业就是一个定时任务。
作业的种类有多种简单作业、数据流作业、脚本作业、HTTP作业(3.0.0-beta 提供)
普通作业只需要实现SimpleJob接口,然后重写execute方法(也是用的最多的作业)
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
import org.springframework.stereotype.Component;
@Component
public class MyElasticJob implements SimpleJob {
@Override
public void execute(ShardingContext context) {
System.out.println(context.getShardingTotalCount() + \" \" + context.getShardingItem());
}
}
MyDataflowJob
import com.xdx97.elasticjob.bean.XdxBean;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.dataflow.job.DataflowJob;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
@Component
public class MyDataflowJob implements DataflowJob<XdxBean> {
@Override
public List<XdxBean> fetchData(ShardingContext shardingContext) {
List<XdxBean> foos = new ArrayList<>();
double random = Math.random();
System.out.println(\"fetchData------ \" + random);
if (random > 0.5){
XdxBean foo = new XdxBean();
foo.setName(\"小道仙\");
foos.add(foo);
}
return foos;
}
@Override
public void processData(ShardingContext shardingContext, List<XdxBean> list) {
System.out.println(\"来了processData------\");
}
}
XdxBean
public class XdxBean {
private String name;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
运行结果:
fetchData------ 0.13745888666984807
fetchData------ 0.2922741337641118
fetchData------ 0.7834818165147507
来了processData------
fetchData------ 0.8177868853353837
来了processData------
fetchData------ 0.14076346085285385
注:Math.random()
产生的数据在0-1之间。
从上面运行的结果,我们可以得出结论,所谓的数据流作业其实也是一个定时任务,只不过当这个定时任务产生数据的时候,就会携带数据去调用processData()
方法
感觉用的不多和时间关系就不研究了
感觉用的不多和时间关系就不研究了
ElasticJob提供了三种使用方法,基于Java、基于SpringBoot、基于配置文件。三种都介绍也很麻烦,原理都一样,我这里就只介绍SpringBoot的。
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starterartifactId>
<version>2.2.0.RELEASEversion>
dependency>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-webartifactId>
<version>2.2.0.RELEASEversion>
dependency>
<dependency>
<groupId>org.apache.shardingsphere.elasticjobgroupId>
<artifactId>elasticjob-lite-spring-boot-starterartifactId>
<version>3.0.0-RC1version>
dependency>
这里有个坑,官方文档里面只说我们有和SpringBoot整合,但是没有提供完整的pom文件,导致卡了我好久。
这个starter里面有数据库相关的连接,我们只是简单测试,不想配置数据源的话,改一下启动类注解即可
@SpringBootApplication(exclude = { DataSourceAutoConfiguration.class })
server:
port: 8085
elasticjob:
regCenter:
#zookeeper 的ip:port
serverLists: 127.0.0.1:2181
#名命空间,自己定义就好了
namespace: my-job4
jobs:
#你的这个定时任务名称,自定义名称
myElasticJob:
#定时任务的全路径名
elasticJobClass: com.elastic.job.MyElasticJob
#定时任务执行的cron表达式
cron: 0/5 * * * * ?
#分片数量
shardingTotalCount: 10
它是水平扩展的核心,比如上面定义了10个片(对应的片名是0-9),假设我们的定时任务是每1分钟执行一次,定时方法是execute。
当我们只有一台服务器的时候,那么每1分钟会调用十次execute(每次调用的时候分片名(0-9)都不一样)。
当我们有两台服务器的时候,那么每1分钟A、B服务器各自调用五次execute(每次调用的时候分片名(A0-4,B5-9)都不一样)
基于上面这个说明,我们可以以此类推,当有三台服务器的时候A(3个),B(3个),C(4个),这样水平扩展就很容易了。
基于上面的分片功能,我们的作业也需要修改一下
public void execute(ShardingContext context) {
System.out.println(context.getShardingTotalCount() + \" \" + context.getShardingItem());
switch (context.getShardingItem()) {
case 0:
// do something by sharding item 0
break;
case 1:
// do something by sharding item 1
break;
case 2:
// do something by sharding item 2
break;
// case n: ...
}
}
假如你把下面的测试代码复制五份,然后一次启动(稍等间隔一会),你会发现每个服务器所分得得片区在变化。依次关闭服务也会发现这样的规律。
比如我们需要临时去调用一下这个定时任务,但是它得间隔时间是几个小时亦或者是更久。
官方给的操作如下,但是我尝试了很久依旧没有解决错误,所以我按照基于Java的调用方式实现了一下
实现代码如下:
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class OneOffJobController {
@GetMapping(\"/execute\")
public String executeOneOffJob() {
OneOffJobBootstrap jobBootstrap = new OneOffJobBootstrap(createRegistryCenter(), new MyElasticJob(), createJobConfiguration());
// 可多次调用一次性调度
jobBootstrap.execute();
return \"success\";
}
private static CoordinatorRegistryCenter createRegistryCenter() {
CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration(\"127.0.0.1:2181\", \"my-job4\"));
regCenter.init();
return regCenter;
}
private static JobConfiguration createJobConfiguration() {
// 创建作业配置
return JobConfiguration.newBuilder(\"myElasticJob\", 10).cron(\"\").build();
}
}
其实上面这个可以做成通用,参数都是用外部传参的方式。至于job对象,我们可以传递一个字符串然后通过反射来创建一个对象。
Class classType = Class.forName(\"com.elastic.job.MyElasticJob\");
ElasticJob obj = (ElasticJob)classType.newInstance();
等后面配置了运维界面,发现不需要如此麻烦,直接在运维界面点一下即可(不需要你提供接口)
就是当我们定时任务执行处了异常,要做些什么处理。官方提供了下面的6种策略,我这里就演示一个邮件通知策略,有兴趣或有需求的可以去研究一下别的策略。
错误处理策略名称 | 说明 | 是否内置 | 是否默认 | 是否需要额外配置 |
---|---|---|---|---|
记录日志策略 | 记录作业异常日志,但不中断作业执行 | 是 | 是 | |
抛出异常策略 | 抛出系统异常并中断作业执行 | 是 | ||
忽略异常策略 | 忽略系统异常且不中断作业执行 | 是 | ||
邮件通知策略 | 发送邮件消息通知,但不中断作业执行 | 是 | ||
企业微信通知策略 | 发送企业微信消息通知,但不中断作业执行 | 是 | ||
钉钉通知策略 | 发送钉钉消息通知,但不中断作业执行 | 是 |
<dependency>
<groupId>org.apache.shardingsphere.elasticjobgroupId>
<artifactId>elasticjob-error-handler-emailartifactId>
<version>3.0.0-RC1version>
dependency>
server:
port: 8085
elasticjob:
regCenter:
#zookeeper 的ip:port
serverLists: 127.0.0.1:2181
#名命空间,自己定义就好了
namespace: my-job5
jobs:
#你的这个定时任务名称,自定义名称
oneSimpleJob:
#定时任务的全路径名
elasticJobClass: com.xdx97.elasticjob.job.OneSimpleJob
#定时任务执行的cron表达式
cron: 0/30 * * * * ?
#分片数量
shardingTotalCount: 1
jobErrorHandlerType: EMAIL
overwrite: true
props:
email:
#邮件服务器地址
host: smtp.126.com
#邮件服务器端口
port: 465
#邮件服务器用户名
username: xxxxxxx
#邮件服务器密码
password: xxxxx
#是否启用 SSL 加密传输
useSsl: true
#邮件主题
subject: ElasticJob error message
#发送方邮箱地址
from: xxxxx@126.com
#接收方邮箱地址
to: xxx@qq.com
#抄送邮箱地址
cc: xxxxxx
#密送邮箱地址
bcc: xxxxx
# 是否开启调试模式
debug: false
我们只需要在上面的定时任务里面抛出一个异常,就会收到邮件了。
这里面有一个坑,看官方文档里面我们以为这个props和jobs是一个等级的其实不是
如果你要配置多个定时任务,并且每个定时任务的通知对象都不一样,可以把上面配置多份,毕竟每个props属于每个定时任务。
ElasticJob-Lite 提供作业监听器,用于在任务执行前和执行后执行监听的方法。 监听器分为每台作业节点均执行的常规监听器和分布式场景中仅单一节点执行的分布式监听器。 本章节将详细介绍他们的使用方式。
在作业依赖(DAG)功能开发完成之后,可能会考虑删除作业监听器功能。
作业监听器文档这块并未提供springboot的方式,我自己尝试了许久也不成功,提了issue暂时没回复,就先这样吧。
如果说上面的分布式分片定时任务让我觉得它牛逼,那么这个运维平台就真的让我眼前一亮了。
这个ui界面是单独的界面,它和我们具体的服务没有任何关联。所以完全不需要修改之前的任何代码,我们只需要下载它,并且启动它就好了。
https://www.apache.org/dyn/closer.cgi/shardingsphere/elasticjob-ui-3.0.0-RC1/apache-shardingsphere-elasticjob-3.0.0-RC1-lite-ui-bin.tar.gz
这里说一下,如果你直接在window下解压启动会报一个错(找不到启动类),这个是因为解压的问题。 (压缩好的包,我也放在下面源代码里面了)
我的解决办法是把压缩包上传到linux上,然后解压后再拉下来。
tar zxvf apache-shardingsphere-elasticjob-3.0.0-RC1-lite-ui-bin.tar
浏览器打开 http://localhost:8088/,用户名/密码:root/root
然后去全局配置>注册中心配置,配置上你的zookeeper地址即可
具体操作都很简单,只是点击按钮就不说了
https://www.bilibili.com/video/BV19L411p7qR
关注下面微信公众号,回复关键字获取:elasticJobLiteDemo